12,selector选择负载均衡算法
filter,可以指定过滤规则,过滤掉部分节点
func Version(version string) selector.NodeFilter {
node
直接获取当前节点:selector/node/direct/direct.go
type Node struct {
selector.Node
// last lastPick timestamp
lastPick int64
}
func (n *Node) Pick() selector.DoneFunc {
selector/node/ewma/node.go
func (n *Node) Pick() selector.DoneFunc {
atomic.StoreInt64(&n.lastPick, now)
p2c随机选择两个节点,然后选择负载较低的节点
type Balancer struct {
mu sync.Mutex
r *rand.Rand
picked int64
}
func (s *Balancer) Pick(ctx context.Context, nodes []selector.WeightedNode) (selector.WeightedNode, selector.DoneFunc, error) {
done := pc.Pick()
a := p.r.Intn(len(p.conns))
b := p.r.Intn(len(p.conns) - 1)
比较两个节点的负载情况,选择负载低的
random随机选择一个节点
func (p *Balancer) Pick(_ context.Context, nodes []selector.WeightedNode) (selector.WeightedNode, selector.DoneFunc, error) {
d := selected.Pick()
wrr 带权轮询算法:Kratos Client内置默认算法
type Balancer struct {
mu sync.Mutex
currentWeight map[string]float64
}
func (p *Balancer) Pick(_ context.Context, nodes []selector.WeightedNode) (selector.WeightedNode, selector.DoneFunc, error) {
它定义了3个变量:(1) weight配置文件中指定的该后端的权重,这个值是固定不变的。(2) effective_weight 后端的有效权重,初始值为weight。在释放后端时,如果发现和后端的通信过程中发生了错误,就减小effective_weight。此后有新的请求过来时,在选取后端的过程中,再逐步增加effective_weight,最终又恢复到weight。之所以增加这个字段,是为了当后端发生错误时,降低其权重。(3) current_weight后端目前的权重,一开始为0,之后会动态调整。每次选取后端时,会遍历集群中所有后端,对于每个后端,让它的current_weight增加它的effective_weight,同时累加所有后端的effective_weight,保存为total。如果该后端的current_weight是最大的,就选定这个后端,然后把它的current_weight减去total。如果该后端没有被选定,那么current_weight不用减小。
banlancer的接口定义位于selector/balancer.go
type Balancer interface {
Pick(ctx context.Context, nodes []WeightedNode) (selected WeightedNode, done DoneFunc, err error)
}
func NewNode(scheme, addr string, ins *registry.ServiceInstance) Node {
n := &DefaultNode{
scheme: scheme,
addr: addr,
}
func (d *Default) Select(ctx context.Context, opts ...SelectOption) (selected Node, done DoneFunc, err error) {
type NodeFilter func(context.Context, []Node) []Node
func GlobalSelector() Builder
func WithNodeFilter(fn ...NodeFilter) SelectOption {
type Peer struct {
// node is the peer node.
Node Node
}
type Selector interface {
Rebalancer
// Select nodes
// if err == nil, selected and done must not be empty.
Select(ctx context.Context, opts ...SelectOption) (selected Node, done DoneFunc, err error)
}
如何使用呢:
// 创建路由 Filter:筛选版本号为"2.0.0"的实例
filter := filter.Version("2.0.0")
// 创建 P2C 负载均衡算法 Selector,并将路由 Filter 注入
selector := p2c.New(p2c.WithFilter(filter))
hConn, err := http.NewClient(
context.Background(),
http.WithEndpoint("discovery:///helloworld"),
http.WithDiscovery(r),
// 通过 http.WithSelector 将 Selector 注入 HTTP Client 中
http.WithSelector(
p2c.New(p2c.WithFilter(filter.Version("2.0.0"))),
)
)
// 创建路由 Filter:筛选版本号为"2.0.0"的实例
filter := filter.Version("2.0.0")
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///helloworld"),
grpc.WithDiscovery(r),
// 由于 gRPC 框架的限制,只能使用全局 balancer name 的方式来注入 selector
grpc.WithBalancerName(wrr.Name),
// 通过 grpc.WithFilter 注入路由 Filter
grpc.WithFilter(filter),
)
13,third_party
第三方proto的依赖:third_party/buf.yaml,third_party/validate/validate.proto,third_party/errors/errors.proto
APIs 响应错误时可以直接使用 errors 包中的 New 方法来声明一个 error,也可以直接通过 proto 预定义定义错误码,然后通过 proto-gen-go 生成帮助代码,直接返回 error。
{
// 错误码,跟 http-status 一致,并且在 grpc 中可以转换成 grpc-status
"code": 500,
// 错误原因,定义为业务判定错误码
"reason": "USER_NOT_FOUND",
// 错误信息,为用户可读的信息,可作为用户提示内容
"message": "invalid argument error",
// 错误元信息,为错误添加附加可扩展信息
"metadata": {
"foo": "bar"
}
}
// 通过 errors.New() 响应错误
errors.New(500, "USER_NAME_EMPTY", "user name is empty")
err = err.WithMetadata(map[string]string{
"foo": "bar",
})
14,transport
包括了grpc和http transport/grpc/resolver/direct/builder.go
先注册服务发现的解析器google.golang.org/grpc/resolver
func init() {
resolver.Register(NewBuilder())
}
func (d *directBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
transport/grpc/resolver/direct/resolver.go
func (r *directResolver) ResolveNow(options resolver.ResolveNowOptions) {
}
transport/grpc/resolver/discovery/builder.go
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
go func() {
w, err := b.discoverer.Watch(ctx, strings.TrimPrefix(target.URL.Path, "/"))
go r.watch()
transport/grpc/resolver/discovery/resolver.go
func (r *discoveryResolver) watch() {
for _, in := range filtered {
ept, _ := endpoint.ParseEndpoint(in.Endpoints, endpoint.Scheme("grpc", !r.insecure))
err := r.cc.UpdateState(resolver.State{Addresses: addrs})
transport/grpc/balancer.go注册负载均衡器
func init() {
b := base.NewBalancerBuilder(
balancerName,
&balancerBuilder{
builder: selector.GlobalSelector(),
},
base.Config{HealthCheck: true},
)
balancer.Register(b)
}
func (b *balancerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
func (p *balancerPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
n, done, err := p.selector.Select(info.Ctx, selector.WithNodeFilter(filters...))
transport/grpc/client.go
func init() {
if selector.GlobalSelector() == nil {
selector.SetGlobalSelector(wrr.NewBuilder())
}
}
默认是github.com/go-kratos/kratos/v2/selector/wrr
_ "github.com/go-kratos/kratos/v2/transport/grpc/resolver/direct"
func Dial(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, error) {
return dial(ctx, false, opts...)
}
transport/grpc/codec.go
func init() {
encoding.RegisterCodec(codec{})
}
google.golang.org/grpc/encoding
func (codec) Marshal(v interface{}) ([]byte, error) {
vv, ok := v.(proto.Message)
transport/grpc/interceptor.go注册middleware
func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
func (s *Server) streamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
transport/grpc/server.go
func NewServer(opts ...ServerOption) *Server {
srv.Server = grpc.NewServer(grpcOpts...)
if !srv.customHealth {
grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
apimd.RegisterMetadataServer(srv.Server, srv.metadata)
reflection.Register(srv.Server)
srv.adminClean, _ = admin.Register(srv.Server)
transport/grpc/transport.go
func (mc headerCarrier) Get(key string) string {
http服务实现类似
transport/http/binding/bind.go
func BindQuery(vars url.Values, target interface{}) error {
if err := encoding.GetCodec(form.Name).Unmarshal([]byte(vars.Encode()), target); err != nil {
func BindForm(req *http.Request, target interface{}) error {
if err := req.ParseForm(); err != nil {
return err
}
if err := encoding.GetCodec(form.Name).Unmarshal([]byte(req.Form.Encode()), target); err != nil {
transport/http/binding/encode.go
func EncodeURL(pathTemplate string, msg interface{}, needQuery bool) string {
if v, ok := msg.(proto.Message); ok {
if query := form.EncodeFieldMask(v.ProtoReflect()); query != "" {
transport/http/pprof/pprof.go,注册pprof
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
transport/http/status/status.go
type Converter interface {
// ToGRPCCode converts an HTTP error code into the corresponding gRPC response status.
ToGRPCCode(code int) codes.Code
// FromGRPCCode converts a gRPC error code into the corresponding HTTP response status.
FromGRPCCode(code codes.Code) int
}
func (c statusConverter) ToGRPCCode(code int) codes.Code {
transport/http/calloption.go
func (EmptyCallOption) before(*callInfo) error { return nil }
func (EmptyCallOption) after(*callInfo, *csAttempt) {}
transport/http/client.go
func init() {
if selector.GlobalSelector() == nil {
selector.SetGlobalSelector(wrr.NewBuilder())
}
}
func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) {
if target.Scheme == "discovery" {
if r, err = newResolver(ctx, options.discovery, target, selector, options.block, insecure, options.subsetSize); err != nil {
func (client *Client) Invoke(ctx context.Context, method, path string, args interface{}, reply interface{}, opts ...CallOption) error {
ctx = transport.NewClientContext(ctx, &Transport{
return client.invoke(ctx, req, args, reply, c, opts...)
func (client *Client) invoke(ctx context.Context, req *http.Request, args interface{}, reply interface{}, c callInfo, opts ...CallOption) error {
h := func(ctx context.Context, in interface{}) (interface{}, error) {
res, err := client.do(req.WithContext(ctx))
func (client *Client) do(req *http.Request) (*http.Response, error) {
if node, done, err = client.selector.Select(req.Context(), selector.WithNodeFilter(client.opts.nodeFilters...)); err != nil {
resp, err := client.cc.Do(req)
transport/http/codec.go
func DefaultRequestVars(r *http.Request, v interface{}) error {
raws := mux.Vars(r)
vars := make(url.Values, len(raws))
for k, v := range raws {
vars[k] = []string{v}
}
return binding.BindQuery(vars, v)
}
transport/http/context.go
func (c *wrapper) JSON(code int, v interface{}) error {
c.res.Header().Set("Content-Type", "application/json")
c.res.WriteHeader(code)
return json.NewEncoder(c.res).Encode(v)
}
transport/http/filter.go
func FilterChain(filters ...FilterFunc) FilterFunc {
return func(next http.Handler) http.Handler {
for i := len(filters) - 1; i >= 0; i-- {
next = filters[i](next)
}
return next
}
}
transport/http/redirect.go
func (r *redirect) Redirect() (string, int) {
return r.URL, r.Code
}
transport/http/resolver.go
func parseTarget(endpoint string, insecure bool) (*Target, error) {
func newResolver(ctx context.Context, discovery registry.Discovery, target *Target,
rebalancer selector.Rebalancer, block, insecure bool, subsetSize int,
) (*resolver, error) {
watcher, err := discovery.Watch(ctx, target.Endpoint)
transport/http/router.go
type RouteInfo struct {
Path string
Method string
}
transport/http/server.go
func NewServer(opts ...ServerOption) *Server {
srv := &Server{
network: "tcp",
srv.router.StrictSlash(srv.strictSlash)
srv.router.NotFoundHandler = http.DefaultServeMux
srv.router.MethodNotAllowedHandler = http.DefaultServeMux
srv.router.Use(srv.filter())
srv.Server = &http.Server{
transport/http/transport.go
func RequestFromServerContext(ctx context.Context) (*http.Request, bool) {
if tr, ok := transport.FromServerContext(ctx); ok {
if tr, ok := tr.(*Transport); ok {
transport/transport.go
func NewServerContext(ctx context.Context, tr Transporter) context.Context {
func FromServerContext(ctx context.Context) (tr Transporter, ok bool) {
func NewClientContext(ctx context.Context, tr Transporter) context.Context {
func FromClientContext(ctx context.Context) (tr Transporter, ok bool) {
type Server interface {
Start(context.Context) error
Stop(context.Context) error
}
15,app.go
func New(opts ...Option) *App {
func (a *App) Run() error {
for _, srv := range a.opts.servers {
srv := srv
eg.Go(func() error {
return srv.Stop(stopCtx)
return srv.Start(sctx)
wg.Wait()
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!