前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >grpc client 源码分析

grpc client 源码分析

作者头像
golangLeetcode
发布于 2022-08-02 11:16:34
发布于 2022-08-02 11:16:34
53400
代码可运行
举报
运行总次数:0
代码可运行

grpc client 代码非常简洁,分三步

1,获取连接

2,初始化客户端

3,发送请求

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
defer conn.Close()
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
c := pb.NewGreeterClient(conn)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})

首先看下发送请求

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type GreeterClient interface {
  // Sends a greeting
  SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
  out := new(HelloReply)
  err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
  if err != nil {
    return nil, err
  }
  return out, nil
}

调用了cc的Invoke方法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type greeterClient struct {
  cc grpc.ClientConnInterface
}

ClientConnInterface的定义如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type ClientConnInterface interface {
  // Invoke performs a unary RPC and returns after the response is received
  // into reply.
  Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
  // NewStream begins a streaming RPC.
  NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
}

接口包含了两个方法Invoke和NewStream定义在clientconn.go中

接着看下client初始化的代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
  return &greeterClient{cc}
}

仅仅把connet interface传给了client

最后看下获取连接的实现

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  return DialContext(context.Background(), target, opts...)
}

clientconn.go的Dial方法返回了ClientConn指针

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  cc := &ClientConn{
    target:            target,
    csMgr:             &connectivityStateManager{},
    conns:             make(map[*addrConn]struct{}),
    dopts:             defaultDialOptions(),
    blockingpicker:    newPickerWrapper(),
    czData:            new(channelzData),
    firstResolveEvent: grpcsync.NewEvent(),
  }
  ....
cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
  ....
  cc.balancerBuildOpts = balancer.BuildOptions{
    DialCreds:        credsClone,
    CredsBundle:      cc.dopts.copts.CredsBundle,
    Dialer:           cc.dopts.copts.Dialer,
    CustomUserAgent:  cc.dopts.copts.UserAgent,
    ChannelzParentID: cc.channelzID,
    Target:           cc.parsedTarget,
  }
  .....
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) 
}

其中ClientConn的结构体定义如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type ClientConn struct {
  ctx    context.Context
  cancel context.CancelFunc

  target       string
  parsedTarget resolver.Target
  authority    string
  dopts        dialOptions
  csMgr        *connectivityStateManager

  balancerBuildOpts balancer.BuildOptions
  blockingpicker    *pickerWrapper

  safeConfigSelector iresolver.SafeConfigSelector

  mu              sync.RWMutex
  resolverWrapper *ccResolverWrapper
  sc              *ServiceConfig
  conns           map[*addrConn]struct{}
  // Keepalive parameter can be updated if a GoAway is received.
  mkp             keepalive.ClientParameters
  curBalancerName string
  balancerWrapper *ccBalancerWrapper
  retryThrottler  atomic.Value

  firstResolveEvent *grpcsync.Event

  channelzID int64 // channelz unique identification number
  czData     *channelzData

  lceMu               sync.Mutex // protects lastConnectionError
  lastConnectionError error
}

可以看出Dial仅仅做了connection的初始化

call.go里定义了Invoke方法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
  // allow interceptor to see all applicable call options, which means those
  // configured as defaults from dial option as well as per-call options
  opts = combine(cc.dopts.callOptions, opts)

  if cc.dopts.unaryInt != nil {
    return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
  }
  return invoke(ctx, method, args, reply, cc, opts...)
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
  cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
  if err != nil {
    return err
  }
  if err := cs.SendMsg(req); err != nil {
    return err
  }
  return cs.RecvMsg(reply)
}

里面分了三步,建立连接,发送请求,获取结果

newClientStream 函数定义在stream.go文件里

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
  rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method})
    callHdr := &transport.CallHdr{
    Host:           cc.authority,
    Method:         method,
    ContentSubtype: c.contentSubtype,
  }
  cs := &clientStream{
    callHdr:      callHdr,
    ctx:          ctx,
    methodConfig: &mc,
    opts:         opts,
    callInfo:     c,
    cc:           cc,
    desc:         desc,
    codec:        c.codec,
    cp:           cp,
    comp:         comp,
    cancel:       cancel,
    beginTime:    beginTime,
    firstAttempt: true,
    onCommit:     onCommit,
  }
  if err := cs.newAttemptLocked(sh, trInfo); err != nil {
    cs.finish(err)
    return nil, err
  }
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
  newAttempt := &csAttempt{
    cs:           cs,
    dc:           cs.cc.dopts.dc,
    statsHandler: sh,
    trInfo:       trInfo,
  }
  t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
 }

getTransport 定义在clientconn.go中

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
  t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
    Ctx:            ctx,
    FullMethodName: method,
  })
  if err != nil {
    return nil, nil, toRPCErr(err)
  }
  return t, done, nil
}

pick函数定义在picker_warper.go

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
for{
pickResult, err := p.Pick(info)
if t, ok := acw.getAddrConn().getReadyTransport(); ok {}
}
}

在stream.go文件里定义了ClientStream的接口

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type ClientStream interface {
// Header returns the header metadata received from the server if there
  // is any. It blocks if the metadata is not ready to read.
  Header() (metadata.MD, error)
  // Trailer returns the trailer metadata from the server, if there is any.
  // It must only be called after stream.CloseAndRecv has returned, or
  // stream.Recv has returned a non-nil error (including io.EOF).
  Trailer() metadata.MD
  // CloseSend closes the send direction of the stream. It closes the stream
  // when non-nil error is met. It is also not safe to call CloseSend
  // concurrently with SendMsg.
  CloseSend() error
  // Context returns the context for this stream.
  //
  // It should not be called until after Header or RecvMsg has returned. Once
  // called, subsequent client-side retries are disabled.
  Context() context.Context
  // SendMsg is generally called by generated code. On error, SendMsg aborts
  // the stream. If the error was generated by the client, the status is
  // returned directly; otherwise, io.EOF is returned and the status of
  // the stream may be discovered using RecvMsg.
  //
  // SendMsg blocks until:
  //   - There is sufficient flow control to schedule m with the transport, or
  //   - The stream is done, or
  //   - The stream breaks.
  //
  // SendMsg does not wait until the message is received by the server. An
  // untimely stream closure may result in lost messages. To ensure delivery,
  // users should ensure the RPC completed successfully using RecvMsg.
  //
  // It is safe to have a goroutine calling SendMsg and another goroutine
  // calling RecvMsg on the same stream at the same time, but it is not safe
  // to call SendMsg on the same stream in different goroutines. It is also
  // not safe to call CloseSend concurrently with SendMsg.
  SendMsg(m interface{}) error
  // RecvMsg blocks until it receives a message into m or the stream is
  // done. It returns io.EOF when the stream completes successfully. On
  // any other error, the stream is aborted and the error contains the RPC
  // status.
  //
  // It is safe to have a goroutine calling SendMsg and another goroutine
  // calling RecvMsg on the same stream at the same time, but it is not
  // safe to call RecvMsg on the same stream in different goroutines.
  RecvMsg(m interface{}) error
}

clientstream实现了上述接口

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type clientStream struct {
  callHdr  *transport.CallHdr
  opts     []CallOption
  callInfo *callInfo
  cc       *ClientConn
  desc     *StreamDesc

  codec baseCodec
  cp    Compressor
  comp  encoding.Compressor

  cancel context.CancelFunc // cancels all attempts

  sentLast  bool // sent an end stream
  beginTime time.Time

  methodConfig *MethodConfig

  ctx context.Context // the application's context, wrapped by stats/tracing

  retryThrottler *retryThrottler // The throttler active when the RPC began.

  binlog *binarylog.MethodLogger // Binary logger, can be nil.
  // serverHeaderBinlogged is a boolean for whether server header has been
  // logged. Server header will be logged when the first time one of those
  // happens: stream.Header(), stream.Recv().
  //
  // It's only read and used by Recv() and Header(), so it doesn't need to be
  // synchronized.
  serverHeaderBinlogged bool

  mu                      sync.Mutex
  firstAttempt            bool // if true, transparent retry is valid
  numRetries              int  // exclusive of transparent retry attempt(s)
  numRetriesSincePushback int  // retries since pushback; to reset backoff
  finished                bool // TODO: replace with atomic cmpxchg or sync.Once?
  // attempt is the active client stream attempt.
  // The only place where it is written is the newAttemptLocked method and this method never writes nil.
  // So, attempt can be nil only inside newClientStream function when clientStream is first created.
  // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
  // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
  // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
  // place where we need to check if the attempt is nil.
  attempt *csAttempt
  // TODO(hedging): hedging will have multiple attempts simultaneously.
  committed  bool // active attempt committed for retry?
  onCommit   func()
  buffer     []func(a *csAttempt) error // operations to replay on retry
  bufferSize int                        // current size of buffer
}

实现了SendMsg和RecvMsg两个方法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (cs *clientStream) SendMsg(m interface{}) (err error) {
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
  op := func(a *csAttempt) error {
    err := a.sendMsg(m, hdr, payload, data)
    // nil out the message and uncomp when replaying; they are only needed for
    // stats which is disabled for subsequent attempts.
    m, data = nil, nil
    return err
  }
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
  data, err = encode(codec, m)
  compData, err := compress(data, cp, comp)
  hdr, payload = msgHeader(data, compData)
  }

实现了数据的编码压缩

紧接着发送数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
  if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
  }
  if a.statsHandler != nil {
    a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
  }  
}

最后是接受消息

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (cs *clientStream) RecvMsg(m interface{}) error {
  err := cs.withRetry(func(a *csAttempt) error {
    return a.recvMsg(m, recvInfo)
  }, cs.commitAttemptLocked)
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
      Client:   true,
      RecvTime: time.Now(),
      Payload:  m,
      // TODO truncate large payload.
      Data:       payInfo.uncompressedBytes,
      WireLength: payInfo.wireLength + headerLen,
      Length:     len(payInfo.uncompressedBytes),
    })
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)    
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
  d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
  if err != nil {
    return err
  }
  if err := c.Unmarshal(d, m); err != nil {
    return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
  }
  if payInfo != nil {
    payInfo.uncompressedBytes = d
  }
  return nil
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-04-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
前端错误捕获方案总结
本文主要摘抄自:https://juejin.cn/post/7172072612430872584#heading-10,主要用来记录和学习,也推荐大家看看原博主的文章。
蓓蕾心晴
2022/12/30
1.7K0
前端错误捕获方案总结
从0到1搭建前端监控平台,面试必备的亮点项目
常常会苦恼,平常做的项目很普通,没啥亮点;面试中也经常会被问到:做过哪些亮点项目吗?
前端老道
2023/02/27
3.7K0
从0到1搭建前端监控平台,面试必备的亮点项目
JavaScript异常如何处理
在前端的开发工作当中,我们对于异常的处理可能关注的不是太多,因为js有基本的异常处理能力,很多错误会直接抛出来,打开控制台就能看到。但是如果因为异常导致网站卡死,甚至崩溃无法继续进行下去,对于用户的体验是相当差的,我们应该及时的捕获这些异常,对用户进行一些简要的温馨提示,并将异常进行及时的上报,以便于快速解决。
OECOM
2020/07/02
1.7K0
JavaScript异常如何处理
沉淀了3年的自研前端错误监控系统,打通你的脉络
这篇文章是我的好朋友广胤所写,里面记录了我们2018年探索的前端监控体系的历程,由于在建设完后的我离职了,后续也没有继续能和广胤一起更进一步的探索,还是有一些些遗憾。还记得我第一次进入「兑吧」的时候,我就在简历里描述了错误监控之类的项目,其实当时我并没有在一个公司进行过实践,这大概是之前在网易的时候,闲来没事,进行的自我探索。然后进入「兑吧」后,没想到当时公司正好缺少这一块的基建,于是 TL 就让我和广胤负责了这块项目,也是这次经历让我从实习阶段就正式踏入了前端基础建设的道路,还是非常感谢这一次的机会,让我从单一的业务开发人员,转化到了结构型开发人员。记得在开发的项目的那一个月中,除了吃饭,或者和广胤讨论项目的进度问题,近乎一种忘我的开发状态。
秋风的笔记
2021/07/30
1.1K0
前端魔法堂——异常不仅仅是try/catch
前言  编程时我们往往拿到的是业务流程正确的业务说明文档或规范,但实际开发中却布满荆棘和例外情况,而这些例外中包含业务用例的例外,也包含技术上的例外。对于业务用例的例外我们别无它法,必须要求实施人员与用户共同提供合理的解决方案;而技术上的例外,则必须由我们码农们手刃之,而这也是我想记录的内容。  我打算分成《前端魔法堂——异常不仅仅是try/catch》和《前端魔法堂——调用栈,异常实例中的宝藏》两篇分别叙述内置/自定义异常类,捕获运行时异常/语法异常/网络请求异常/PromiseRejection事件,
^_^肥仔John
2018/01/18
1.6K0
vue前端异常监控sentry实践
但是onerror事件无法捕获到网络异常的错误(资源加载失败、图片显示异常等),例如img标签下图片url 404 网络请求异常的时候,onerror无法捕获到异常,此时需要监听unhandledrejection。
csxiaoyao
2019/04/09
1.7K0
前端异常埋点系统初探
https://juejin.cn/post/6965022635470110733
@超人
2021/07/05
1.1K0
前端异常埋点系统初探
如何搭建前端异常监控系统
是指用户在使用应用时,无法得到预期的结果。不同的异常带来的后果程度不同,轻则引起用户使用不悦,重则导致产品无法使用,从而使用户丧失对产品的认可。
iOSSir
2020/07/24
1.9K0
如何搭建前端异常监控系统
web前端监控的三个方面探讨
以 init 为程序的入口,代码中所有同步执行出现的错误都会被捕获,这种方式也可以很好的避免程序刚跑起来就挂。
smy
2018/08/01
1.2K0
web前端监控的三个方面探讨
干货满满!如何做好前端日志和异常监控的思考
在研发过程中,日志是非常重要的一环,它可以帮助我们快速定位问题,解决问题。在前端开发中,日志也是非常重要的一环,它可以帮助我们快速定位问题,解决问题。本文将介绍前端日志的规范和最佳实践。但是我们经常看到一些项目日志打得满天飞,但是到了真正定位问题的时候,发现日志并没有什么卵用。这是因为日志打得不规范,不规范的日志是没有意义的。所以我们需要规范日志的打印,才能让日志发挥最大的作用。
老码小张
2024/03/20
1.6K1
干货满满!如何做好前端日志和异常监控的思考
让前端监控数据采集更高效
随着业务的快速发展,我们对生产环境下的问题感知能力越来越关注。作为距离用户最近的一层,前端的表现是否可靠、稳定、好用,很大程度上决定着用户对整个产品的体验和感受。因此,对于前端的监控不容忽视。
桃翁
2019/05/31
1.5K0
剖析前端异常及其降级处理和防范方案
“异常”一词出自《后汉书.卷一.皇后纪上.光烈阴皇后纪》,表示非正常的,不同于平常的。在我们现实生活中同样处处存在着异常,比如小县城里的路灯年久失修...,上下班高峰期深圳的地铁总是那么的拥挤...,人也总是时不时会生病等等; 由此可见,这个世界错误无处不在,这是一个基本的事实。
winty
2021/07/27
1.4K0
如何搭建前端异常监控系统
是指用户在使用应用时,无法得到预期的结果。不同的异常带来的后果程度不同,轻则引起用户使用不悦,重则导致产品无法使用,从而使用户丧失对产品的认可。
发声的沉默者
2021/06/14
1.3K0
如何搭建前端异常监控系统
一篇文章教你如何捕获前端错误
JavaScript代码在用户浏览器中执行时,由于一些边界情况、本地环境的不可控等因素,可能会存在js运行时错误。
2020labs小助手
2019/07/10
3.5K0
前端异常的捕获与处理
? 这是第 89 篇不掺水的原创,想要了解更多,请戳上方蓝色字体:政采云前端团队 关注我们吧~ 本文首发于政采云前端团队博客:前端异常的捕获与处理 https://www.zoo.team/arti
政采云前端团队
2021/03/16
3.8K0
前端异常的捕获与处理
前端异常捕获和定位
开发阶段,通过详细的报错信息,我们可以快速定位并解决问题。在生产,通过异常监控,根据异常埋点信息,我们可以第一时间知道异常信息,不至于造成严重后果。
GopalFeng
2020/09/24
1.4K0
前端异常捕获和定位
如何及时发现网页的隐形错误
先来说说JavaScript的错误类型,ECMA-262 定义了 7 种错误类型,说明如下:
zayyo
2023/11/02
3240
前端监控那些事
监控这个词对于前端,个人觉得有三个定义,分别是“性能监控”、“异常监控”、“数据监控” 性能监控则是针对web应用的性能,涉及包括用户体验、用户交互时间等 异常监控则是指Web应用得不到预期效果结果的情况监控 数据监控则是获取用户使用过程的行为数据反馈 1.性能监控 性能监控可以让我们更好的监控当前应用的性能情况,然后对性能情况反馈去做优化,性能会影响到用户体验,而常见的性能指标我们能通过浏览器Performance里面看到 1.1 Performace 允许访问当前页面性能相关的信息,perf
树酱
2020/07/03
1.3K0
如何优雅处理前端的异常?
前端一直是距离用户最近的一层,随着产品的日益完善,我们会更加注重用户体验,而前端异常却如鲠在喉,甚是烦人。
桃翁
2019/05/31
1.9K0
React,优雅的捕获异常
https://juejin.cn/post/6974383324148006926
winty
2021/07/27
8780
相关推荐
前端错误捕获方案总结
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验