前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kratos源码分析系列(6)

kratos源码分析系列(6)

作者头像
golangLeetcode
发布2023-09-06 19:14:09
4650
发布2023-09-06 19:14:09
举报
文章被收录于专栏:golang算法架构leetcode技术php

12,selector选择负载均衡算法

filter,可以指定过滤规则,过滤掉部分节点

代码语言:javascript
复制
func Version(version string) selector.NodeFilter {

node

直接获取当前节点:selector/node/direct/direct.go

代码语言:javascript
复制
type Node struct {
  selector.Node


  // last lastPick timestamp
  lastPick int64
}
代码语言:javascript
复制
func (n *Node) Pick() selector.DoneFunc {

selector/node/ewma/node.go

代码语言:javascript
复制
func (n *Node) Pick() selector.DoneFunc {
        atomic.StoreInt64(&n.lastPick, now)

p2c随机选择两个节点,然后选择负载较低的节点

代码语言:javascript
复制
type Balancer struct {
  mu     sync.Mutex
  r      *rand.Rand
  picked int64
}
代码语言:javascript
复制
func (s *Balancer) Pick(ctx context.Context, nodes []selector.WeightedNode) (selector.WeightedNode, selector.DoneFunc, error) {
        done := pc.Pick()
代码语言:javascript
复制
    a := p.r.Intn(len(p.conns))
    b := p.r.Intn(len(p.conns) - 1)

比较两个节点的负载情况,选择负载低的

random随机选择一个节点

代码语言:javascript
复制
func (p *Balancer) Pick(_ context.Context, nodes []selector.WeightedNode) (selector.WeightedNode, selector.DoneFunc, error) {
      d := selected.Pick()

wrr 带权轮询算法:Kratos Client内置默认算法

代码语言:javascript
复制
type Balancer struct {
  mu            sync.Mutex
  currentWeight map[string]float64
}
代码语言:javascript
复制
func (p *Balancer) Pick(_ context.Context, nodes []selector.WeightedNode) (selector.WeightedNode, selector.DoneFunc, error) {

它定义了3个变量:(1) weight配置文件中指定的该后端的权重,这个值是固定不变的。(2) effective_weight 后端的有效权重,初始值为weight。在释放后端时,如果发现和后端的通信过程中发生了错误,就减小effective_weight。此后有新的请求过来时,在选取后端的过程中,再逐步增加effective_weight,最终又恢复到weight。之所以增加这个字段,是为了当后端发生错误时,降低其权重。(3) current_weight后端目前的权重,一开始为0,之后会动态调整。每次选取后端时,会遍历集群中所有后端,对于每个后端,让它的current_weight增加它的effective_weight,同时累加所有后端的effective_weight,保存为total。如果该后端的current_weight是最大的,就选定这个后端,然后把它的current_weight减去total。如果该后端没有被选定,那么current_weight不用减小。

banlancer的接口定义位于selector/balancer.go

代码语言:javascript
复制
type Balancer interface {
  Pick(ctx context.Context, nodes []WeightedNode) (selected WeightedNode, done DoneFunc, err error)
}
代码语言:javascript
复制
func NewNode(scheme, addr string, ins *registry.ServiceInstance) Node {
  n := &DefaultNode{
    scheme: scheme,
    addr:   addr,
  }
代码语言:javascript
复制
func (d *Default) Select(ctx context.Context, opts ...SelectOption) (selected Node, done DoneFunc, err error) {
    type NodeFilter func(context.Context, []Node) []Node
func GlobalSelector() Builder
func WithNodeFilter(fn ...NodeFilter) SelectOption {
type Peer struct {
  // node is the peer node.
  Node Node
}
代码语言:javascript
复制
type Selector interface {
  Rebalancer


  // Select nodes
  // if err == nil, selected and done must not be empty.
  Select(ctx context.Context, opts ...SelectOption) (selected Node, done DoneFunc, err error)
}

如何使用呢:

代码语言:javascript
复制
// 创建路由 Filter:筛选版本号为"2.0.0"的实例
filter :=  filter.Version("2.0.0")
// 创建 P2C 负载均衡算法 Selector,并将路由 Filter 注入
selector := p2c.New(p2c.WithFilter(filter))
    hConn, err := http.NewClient(
  context.Background(),
  http.WithEndpoint("discovery:///helloworld"),
  http.WithDiscovery(r),
  // 通过 http.WithSelector 将 Selector 注入 HTTP Client 中
  http.WithSelector(
    p2c.New(p2c.WithFilter(filter.Version("2.0.0"))),
  )
)
代码语言:javascript
复制
// 创建路由 Filter:筛选版本号为"2.0.0"的实例
filter :=  filter.Version("2.0.0")


conn, err := grpc.DialInsecure(
  context.Background(),
  grpc.WithEndpoint("discovery:///helloworld"),
  grpc.WithDiscovery(r),
  // 由于 gRPC 框架的限制,只能使用全局 balancer name 的方式来注入 selector
  grpc.WithBalancerName(wrr.Name),
  // 通过 grpc.WithFilter 注入路由 Filter
  grpc.WithFilter(filter),
)

13,third_party

第三方proto的依赖:third_party/buf.yaml,third_party/validate/validate.proto,third_party/errors/errors.proto

APIs 响应错误时可以直接使用 errors 包中的 New 方法来声明一个 error,也可以直接通过 proto 预定义定义错误码,然后通过 proto-gen-go 生成帮助代码,直接返回 error。

代码语言:javascript
复制
{
    // 错误码,跟 http-status 一致,并且在 grpc 中可以转换成 grpc-status
    "code": 500,
    // 错误原因,定义为业务判定错误码
    "reason": "USER_NOT_FOUND",
    // 错误信息,为用户可读的信息,可作为用户提示内容
    "message": "invalid argument error",
    // 错误元信息,为错误添加附加可扩展信息
    "metadata": {
      "foo": "bar"
    }
}
代码语言:javascript
复制
// 通过 errors.New() 响应错误
errors.New(500, "USER_NAME_EMPTY", "user name is empty")
代码语言:javascript
复制
err = err.WithMetadata(map[string]string{
    "foo": "bar",
})

14,transport

包括了grpc和http transport/grpc/resolver/direct/builder.go

先注册服务发现的解析器google.golang.org/grpc/resolver

代码语言:javascript
复制
func init() {
  resolver.Register(NewBuilder())
}
代码语言:javascript
复制
func (d *directBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {

transport/grpc/resolver/direct/resolver.go

代码语言:javascript
复制
func (r *directResolver) ResolveNow(options resolver.ResolveNowOptions) {
}

transport/grpc/resolver/discovery/builder.go

代码语言:javascript
复制
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
          go func() {
    w, err := b.discoverer.Watch(ctx, strings.TrimPrefix(target.URL.Path, "/"))
        go r.watch()

transport/grpc/resolver/discovery/resolver.go

代码语言:javascript
复制
func (r *discoveryResolver) watch() {
        for _, in := range filtered {
    ept, _ := endpoint.ParseEndpoint(in.Endpoints, endpoint.Scheme("grpc", !r.insecure))
      err := r.cc.UpdateState(resolver.State{Addresses: addrs})

transport/grpc/balancer.go注册负载均衡器

代码语言:javascript
复制
func init() {
  b := base.NewBalancerBuilder(
    balancerName,
    &balancerBuilder{
      builder: selector.GlobalSelector(),
    },
    base.Config{HealthCheck: true},
  )
  balancer.Register(b)
}
代码语言:javascript
复制
func (b *balancerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
      func (p *balancerPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
          n, done, err := p.selector.Select(info.Ctx, selector.WithNodeFilter(filters...))

transport/grpc/client.go

代码语言:javascript
复制
func init() {
  if selector.GlobalSelector() == nil {
    selector.SetGlobalSelector(wrr.NewBuilder())
  }

}

默认是github.com/go-kratos/kratos/v2/selector/wrr

代码语言:javascript
复制
_ "github.com/go-kratos/kratos/v2/transport/grpc/resolver/direct"
 func Dial(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, error) {
  return dial(ctx, false, opts...)
  }

transport/grpc/codec.go

代码语言:javascript
复制
func init() {
  encoding.RegisterCodec(codec{})
}

google.golang.org/grpc/encoding

代码语言:javascript
复制
func (codec) Marshal(v interface{}) ([]byte, error) {
  vv, ok := v.(proto.Message)

transport/grpc/interceptor.go注册middleware

代码语言:javascript
复制
func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
  return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
func (s *Server) streamServerInterceptor() grpc.StreamServerInterceptor {
  return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {

transport/grpc/server.go

代码语言:javascript
复制
func NewServer(opts ...ServerOption) *Server {
      srv.Server = grpc.NewServer(grpcOpts...)
        if !srv.customHealth {
    grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
        apimd.RegisterMetadataServer(srv.Server, srv.metadata)
     reflection.Register(srv.Server)
    srv.adminClean, _ = admin.Register(srv.Server)

transport/grpc/transport.go

代码语言:javascript
复制
func (mc headerCarrier) Get(key string) string {

http服务实现类似

transport/http/binding/bind.go

代码语言:javascript
复制
func BindQuery(vars url.Values, target interface{}) error {
  if err := encoding.GetCodec(form.Name).Unmarshal([]byte(vars.Encode()), target); err != nil {
代码语言:javascript
复制
func BindForm(req *http.Request, target interface{}) error {
  if err := req.ParseForm(); err != nil {
    return err
  }
  if err := encoding.GetCodec(form.Name).Unmarshal([]byte(req.Form.Encode()), target); err != nil {

transport/http/binding/encode.go

代码语言:javascript
复制
func EncodeURL(pathTemplate string, msg interface{}, needQuery bool) string {
            if v, ok := msg.(proto.Message); ok {
      if query := form.EncodeFieldMask(v.ProtoReflect()); query != "" {

transport/http/pprof/pprof.go,注册pprof

代码语言:javascript
复制
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)

transport/http/status/status.go

代码语言:javascript
复制
type Converter interface {
  // ToGRPCCode converts an HTTP error code into the corresponding gRPC response status.
  ToGRPCCode(code int) codes.Code


  // FromGRPCCode converts a gRPC error code into the corresponding HTTP response status.
  FromGRPCCode(code codes.Code) int
}
代码语言:javascript
复制
func (c statusConverter) ToGRPCCode(code int) codes.Code {

transport/http/calloption.go

代码语言:javascript
复制
func (EmptyCallOption) before(*callInfo) error      { return nil }
func (EmptyCallOption) after(*callInfo, *csAttempt) {}

transport/http/client.go

代码语言:javascript
复制
func init() {
  if selector.GlobalSelector() == nil {
    selector.SetGlobalSelector(wrr.NewBuilder())
  }
}
代码语言:javascript
复制
func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) {
      if target.Scheme == "discovery" {
      if r, err = newResolver(ctx, options.discovery, target, selector, options.block, insecure, options.subsetSize); err != nil {
代码语言:javascript
复制
func (client *Client) Invoke(ctx context.Context, method, path string, args interface{}, reply interface{}, opts ...CallOption) error {
        ctx = transport.NewClientContext(ctx, &Transport{
        return client.invoke(ctx, req, args, reply, c, opts...)
代码语言:javascript
复制
func (client *Client) invoke(ctx context.Context, req *http.Request, args interface{}, reply interface{}, c callInfo, opts ...CallOption) error {
          h := func(ctx context.Context, in interface{}) (interface{}, error) {
    res, err := client.do(req.WithContext(ctx))
代码语言:javascript
复制
func (client *Client) do(req *http.Request) (*http.Response, error) {
          if node, done, err = client.selector.Select(req.Context(), selector.WithNodeFilter(client.opts.nodeFilters...)); err != nil {
        resp, err := client.cc.Do(req)

transport/http/codec.go

代码语言:javascript
复制
func DefaultRequestVars(r *http.Request, v interface{}) error {
  raws := mux.Vars(r)
  vars := make(url.Values, len(raws))
  for k, v := range raws {
    vars[k] = []string{v}
  }
  return binding.BindQuery(vars, v)
  }

transport/http/context.go

代码语言:javascript
复制
func (c *wrapper) JSON(code int, v interface{}) error {
  c.res.Header().Set("Content-Type", "application/json")
  c.res.WriteHeader(code)
  return json.NewEncoder(c.res).Encode(v)
}

transport/http/filter.go

代码语言:javascript
复制
func FilterChain(filters ...FilterFunc) FilterFunc {
  return func(next http.Handler) http.Handler {
    for i := len(filters) - 1; i >= 0; i-- {
      next = filters[i](next)
    }
    return next
  }
}

transport/http/redirect.go

代码语言:javascript
复制
func (r *redirect) Redirect() (string, int) {
  return r.URL, r.Code
}

transport/http/resolver.go

代码语言:javascript
复制
func parseTarget(endpoint string, insecure bool) (*Target, error) {
代码语言:javascript
复制
func newResolver(ctx context.Context, discovery registry.Discovery, target *Target,
  rebalancer selector.Rebalancer, block, insecure bool, subsetSize int,
) (*resolver, error) {
        watcher, err := discovery.Watch(ctx, target.Endpoint)

transport/http/router.go

代码语言:javascript
复制
type RouteInfo struct {
  Path   string
  Method string
}

transport/http/server.go

代码语言:javascript
复制
func NewServer(opts ...ServerOption) *Server {
          srv := &Server{
    network:     "tcp",
        srv.router.StrictSlash(srv.strictSlash)
  srv.router.NotFoundHandler = http.DefaultServeMux
  srv.router.MethodNotAllowedHandler = http.DefaultServeMux
  srv.router.Use(srv.filter())
  srv.Server = &http.Server{

transport/http/transport.go

代码语言:javascript
复制
func RequestFromServerContext(ctx context.Context) (*http.Request, bool) {
  if tr, ok := transport.FromServerContext(ctx); ok {
    if tr, ok := tr.(*Transport); ok {

transport/transport.go

代码语言:javascript
复制
func NewServerContext(ctx context.Context, tr Transporter) context.Context {
代码语言:javascript
复制
func FromServerContext(ctx context.Context) (tr Transporter, ok bool) {
代码语言:javascript
复制
func NewClientContext(ctx context.Context, tr Transporter) context.Context {    
代码语言:javascript
复制
func FromClientContext(ctx context.Context) (tr Transporter, ok bool) {
代码语言:javascript
复制
type Server interface {
  Start(context.Context) error
  Stop(context.Context) error
}

15,app.go

代码语言:javascript
复制
func New(opts ...Option) *App {
代码语言:javascript
复制
func (a *App) Run() error {
      for _, srv := range a.opts.servers {
    srv := srv
    eg.Go(func() error {
      return srv.Stop(stopCtx)
      return srv.Start(sctx)
    wg.Wait()
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-03-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的四七层流量分发服务,访问流量经由 CLB 可以自动分配到多台后端服务器上,扩展系统的服务能力并消除单点故障。轻松应对大流量访问场景。 网关负载均衡(Gateway Load Balancer,GWLB)是运行在网络层的负载均衡。通过 GWLB 可以帮助客户部署、扩展和管理第三方虚拟设备,操作简单,安全性强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档