本文主要研究一下dubbo-go的HystrixFilter
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
// HystrixFilter ...
type HystrixFilter struct {
COrP bool //true for consumer
res map[string][]*regexp.Regexp
ifNewMap sync.Map
}
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
// Invoke ...
func (hf *HystrixFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
cmdName := fmt.Sprintf("%s&method=%s", invoker.GetUrl().Key(), invocation.MethodName())
// Do the configuration if the circuit breaker is created for the first time
if _, load := hf.ifNewMap.LoadOrStore(cmdName, true); !load {
configLoadMutex.Lock()
filterConf := getConfig(invoker.GetUrl().Service(), invocation.MethodName(), hf.COrP)
for _, ptn := range filterConf.Error {
reg, err := regexp.Compile(ptn)
if err != nil {
logger.Warnf("[Hystrix Filter]Errors occurred parsing error omit regexp: %s, %v", ptn, err)
} else {
if hf.res == nil {
hf.res = make(map[string][]*regexp.Regexp)
}
hf.res[invocation.MethodName()] = append(hf.res[invocation.MethodName()], reg)
}
}
hystrix.ConfigureCommand(cmdName, hystrix.CommandConfig{
Timeout: filterConf.Timeout,
MaxConcurrentRequests: filterConf.MaxConcurrentRequests,
SleepWindow: filterConf.SleepWindow,
ErrorPercentThreshold: filterConf.ErrorPercentThreshold,
RequestVolumeThreshold: filterConf.RequestVolumeThreshold,
})
configLoadMutex.Unlock()
}
configLoadMutex.RLock()
_, _, err := hystrix.GetCircuit(cmdName)
configLoadMutex.RUnlock()
if err != nil {
logger.Errorf("[Hystrix Filter]Errors occurred getting circuit for %s , will invoke without hystrix, error is: ", cmdName, err)
return invoker.Invoke(ctx, invocation)
}
logger.Infof("[Hystrix Filter]Using hystrix filter: %s", cmdName)
var result protocol.Result
_ = hystrix.Do(cmdName, func() error {
result = invoker.Invoke(ctx, invocation)
err := result.Error()
if err != nil {
result.SetError(NewHystrixFilterError(err, false))
for _, reg := range hf.res[invocation.MethodName()] {
if reg.MatchString(err.Error()) {
logger.Debugf("[Hystrix Filter]Error in invocation but omitted in circuit breaker: %v; %s", err, cmdName)
return nil
}
}
}
return err
}, func(err error) error {
//Return error and if it is caused by hystrix logic, so that it can be handled by previous filters.
_, ok := err.(hystrix.CircuitError)
logger.Debugf("[Hystrix Filter]Hystrix health check counted, error is: %v, failed by hystrix: %v; %s", err, ok, cmdName)
result = &protocol.RPCResult{}
result.SetResult(nil)
result.SetError(NewHystrixFilterError(err, ok))
return err
})
return result
}
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
// OnResponse ...
func (hf *HystrixFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
// GetHystrixFilterConsumer ...
func GetHystrixFilterConsumer() filter.Filter {
//When first called, load the config in
consumerConfigOnce.Do(func() {
if err := initHystrixConfigConsumer(); err != nil {
logger.Warnf("[Hystrix Filter]Config load failed for consumer, error is: %v , will use default", err)
}
})
return &HystrixFilter{COrP: true}
}
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
// GetHystrixFilterProvider ...
func GetHystrixFilterProvider() filter.Filter {
providerConfigOnce.Do(func() {
if err := initHystrixConfigProvider(); err != nil {
logger.Warnf("[Hystrix Filter]Config load failed for provider, error is: %v , will use default", err)
}
})
return &HystrixFilter{COrP: false}
}
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
func initHystrixConfigConsumer() error {
if config.GetConsumerConfig().FilterConf == nil {
return perrors.Errorf("no config for hystrix")
}
filterConfig := config.GetConsumerConfig().FilterConf.(map[interface{}]interface{})[HYSTRIX]
if filterConfig == nil {
return perrors.Errorf("no config for hystrix")
}
hystrixConfByte, err := yaml.Marshal(filterConfig)
if err != nil {
return err
}
err = yaml.Unmarshal(hystrixConfByte, confConsumer)
if err != nil {
return err
}
return nil
}
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
func initHystrixConfigProvider() error {
if config.GetProviderConfig().FilterConf == nil {
return perrors.Errorf("no config for hystrix")
}
filterConfig := config.GetConsumerConfig().FilterConf.(map[interface{}]interface{})[HYSTRIX]
if filterConfig == nil {
return perrors.Errorf("no config for hystrix")
}
hystrixConfByte, err := yaml.Marshal(filterConfig)
if err != nil {
return err
}
err = yaml.Unmarshal(hystrixConfByte, confProvider)
if err != nil {
return err
}
return nil
}
HystrixFilter的Invoke方法首先通过hf.ifNewMap.LoadOrStore判断该cmdName的circuit breaker是否已经创建,还没有创建的话,会通过getConfig获取filterConf,给指定的invocation.MethodName()创建reg,之后通过hystrix.ConfigureCommand进行配置;之后通过hystrix.GetCircuit(cmdName)获取circuit,然后执行hystrix.Do,在该func里头执行invoker.Invoke(ctx, invocation)
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有