前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >grpc-go之超时与重试(三)

grpc-go之超时与重试(三)

原创
作者头像
Johns
修改2022-09-28 20:52:43
2.9K1
修改2022-09-28 20:52:43
举报
文章被收录于专栏:代码工具

介绍

超时

go里面一般会使用Context进行超时控制以及参数传递, 其中超时控制可以使用context.WithDeadline()或者context.WithTimeout()实现, 二者实现效果是一致的.

  • WithTimeout 只能设置在某一段时间后超时,比如3秒后超时
  • WithDeadline() 则可以设置到具体某个时间点, 比如在临晨0点10分20秒的时候返回

gRPC基本上所有的对外函数都是带context参数的, 所以说它默认就集成了context的功能, 我们只需要在调用方法的时候传入 ctx 参数便可.

重试

gRPC 中已经内置了 retry 功能,可以直接使用, 一般我们在请求失败后可能会重试几次

客户端需要通过grpc.WithDefaultServiceConfig()配置 retry 功能, 并且设置环境变量

代码语言:txt
复制
export GRPC_GO_RETRY=on

配置说明

代码语言:json
复制
{
		"methodConfig": [{
		  "name": [{"service": "echo.Echo","method":"UnaryEcho"}],
		  "retryPolicy": {
			  "MaxAttempts": 4,
			  "InitialBackoff": ".01s",
			  "MaxBackoff": ".01s",
			  "BackoffMultiplier": 1.0,
			  "RetryableStatusCodes": [ "UNAVAILABLE","DEADLINE_EXCEEDED" ]
		  }}]
}
  • name 指定下面的配置信息作用的 RPC 服务或方法
  • service:通过服务名匹配,语法为<package>.<service>package就是proto文件中指定的package,service也是proto文件中指定的 Service Name。
  • method:匹配具体某个方法,proto文件中定义的方法名。
  • MaxAttempts:最大尝试次数
  • InitialBackoff:默认退避时间
  • MaxBackoff:最大退避时间
  • BackoffMultiplier:退避时间增加倍率
  • RetryableStatusCodes:服务端返回什么错误码才重试

注意必须是服务端返回的code, 客户端导致的错误码不算

代码语言:go
复制
"OK",
"CANCELLED",
"UNKNOWN",
"INVALID_ARGUMENT",
"DEADLINE_EXCEEDED",
"NOT_FOUND",
"ALREADY_EXISTS",
"PERMISSION_DENIED",
"RESOURCE_EXHAUSTED",
"FAILED_PRECONDITION",
"ABORTED",
"OUT_OF_RANGE",
"UNIMPLEMENTED",
"INTERNAL",
"UNAVAILABLE",
"DATA_LOSS",
"UNAUTHENTICATED",

案例演示

下面的案例演示了一个重试的案例, 同时你可以发现如果是客户端context超时, 那么重试机制就会不起作用, 因为只有服务端返回的错误码才作数.

client/main.go

代码语言:txt
复制
package main

import (
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/credentials/oauth"
	"google.golang.org/grpc/examples/data"
	"grpc-demo/helloworld/auth"
	"grpc-demo/helloworld/pb"
	"log"
)

const (
	address = "localhost:50051"
)

func main() {

	// 更多配置信息查看官方文 https://github.com/grpc/grpc/blob/master/doc/service_config.md
	// service这里语法为<package>.<service> package就是proto文件中指定的package, service也是proto文件中指定的 Service Name。
	// method 可以不指定 即当前service下的所以方法都使用该配置。
	retryPolicy := `{
		"methodConfig": [{
		  "name": [{"service": "pb.Greeter","method":"SayHello"}],
		  "retryPolicy": {
			  "MaxAttempts": 4,
			  "InitialBackoff": ".01s",
			  "MaxBackoff": ".01s",
			  "BackoffMultiplier": 1.0,
			  "RetryableStatusCodes": [ "UNAVAILABLE" ]
		  }
		}]}`
	// 构建一个 PerRPCCredentials。
	// 使用内置的Oauth2进行身份验证
	oauthAuth := oauth.NewOauthAccess(auth.FetchToken())

	// 使用自定义的的身份验证
	userPwdAuth := auth.NewUserPwdAuth()

	// 使用自定义的的身份验证
	jwtAuth := auth.NewJWTAuthToken()

	cred, err := credentials.NewClientTLSFromFile(data.Path("/Users/guirong/go/src/grpc-demo/helloworld/client/ca.crt"),
		"www.ggr.com")
	if err != nil {
		log.Fatalf("failed to load credentials: %v", err)
	}

	conn, err := grpc.Dial(address,
		grpc.WithDefaultServiceConfig(retryPolicy),
		grpc.WithTransportCredentials(cred),
		grpc.WithPerRPCCredentials(userPwdAuth),
		grpc.WithPerRPCCredentials(oauthAuth),
		grpc.WithPerRPCCredentials(jwtAuth),
	)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewBattleServiceClient(conn)
	bidirectionalStreamBattle(client)

	client2 := pb.NewGreeterClient(conn)
	sayHello(client2)
}

client/client.go

代码语言:txt
复制
package main

import (
	"context"
	"fmt"
	"grpc-demo/helloworld/pb"
	"io"
	"log"
	"os"
	"time"
)

func bidirectionalStreamBattle(client pb.BattleServiceClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	stream, err := client.Battle(ctx)
	if err != nil {
		log.Fatalf("could not battle: %v", err)
	}
	err = stream.SendMsg(&pb.BattleRequest{
		HeroId:  "hero_1",
		SkillId: "Skill_1",
	})
	if err != nil {
		log.Fatalf("could not battle: %v", err)
	}
	err = stream.SendMsg(&pb.BattleRequest{
		HeroId:  "hero_2",
		SkillId: "Skill_2",
	})
	if err != nil {
		log.Fatalf("could not battle: %v", err)
	}
	ch := make(chan struct{})
	go asyncDoBattle(stream, ch)
	err = stream.CloseSend()
	if err != nil {
		log.Fatalf("could not battle: %v", err)
	}
	<-ch
}

func asyncDoBattle(stream pb.BattleService_BattleClient, c chan struct{}) {
	for {
		rsp, err := stream.Recv()
		if err == io.EOF {
			break
		}
		fmt.Println(rsp)
	}
	c <- struct{}{}
}

func sayHello(client pb.GreeterClient, want codes.Code) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	// 通过命令行参数指定 name
	name := "world"
	if len(os.Args) > 1 {
		name = os.Args[1]
	}
	_, err := client.SayHello(ctx, &pb.HelloRequest{Name: name})
	got := status.Code(err)
	fmt.Printf("sayHello wanted = %v, got = %v\n", want, got)
}

服务端

为了更好地演示实验效果, 我们需要对server端代码做一点微调

代码语言:txt
复制
package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/status"
	"grpc-demo/helloworld/auth"
	pb "grpc-demo/helloworld/pb"
	"io"
	"log"
	"time"
)

var (
	errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata")
	errInvalidToken    = status.Errorf(codes.Unauthenticated, "invalid token")
)

// BattleServer 游戏战斗服务
type BattleServer struct {
	pb.UnimplementedBattleServiceServer
}


// Battle 战斗数据回传
func (h *BattleServer) Battle(steam pb.BattleService_BattleServer) error {
	for {
		req, err := steam.Recv()
		fmt.Println(req)
		if err == io.EOF { //发送最后一次结果给前端
			err = steam.Send(&pb.BattleResponse{})
			if err != nil {
				log.Println(err)
			}
			return nil
		}
		err = steam.Send(&pb.BattleResponse{
			Hero: []*pb.HeroInfo{
				{Id: "hero_1", Life: 999},
			},
			Skill: []*pb.SkillInfo{
				{SkillId: "skill_1", CoolDown: 1664249248},
				{SkillId: "skill_2", CoolDown: 1664249293},
			},
		})
		if err != nil {
			log.Println(err)
		}
		time.Sleep(time.Second * 3)
	}
}

// GreeterServer 定义一个结构体用于实现 .proto文件中定义的方法
// 新版本 gRPC 要求必须嵌入 pb.UnimplementedGreeterServer 结构体
type GreeterServer struct {
	pb.UnimplementedGreeterServer
}

var countRetry = 0

// SayHello 简单实现一下.proto文件中定义的 SayHello 方法
func (g *GreeterServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	countRetry++
	fmt.Println("当前countRetry", countRetry)
	if countRetry < 3 {
		return nil, status.Errorf(codes.DeadlineExceeded, "maybeFailRequest: failing it")
	}
	if countRetry == 4 {
		time.Sleep(time.Second)
		countRetry=0
	}

	log.Printf("Received Msg: %v", in.GetName())
	return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

// userPwdCheckInterceptor (用户名/密码)身份验证拦截器
func userPwdCheckInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler) (interface{}, error) {
	// 如果返回err不为nil则说明token验证未通过
	err := auth.IsValidUserPwd(ctx)
	if err != nil {
		return nil, err
	}
	return handler(ctx, req)
}

// authTokenInterceptor (jwt和Oauth2 token)身份验证拦截器
func authTokenInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
	handler grpc.StreamHandler) error {
	// authentication (token verification)
	md, ok := metadata.FromIncomingContext(ss.Context())
	if !ok {
		return errMissingMetadata
	}

	// 验证oauth2
	if !auth.IsValidOauth2(md["authorization"]) {
		return errInvalidToken
	}

	// 验证jwt
	ok, _ = auth.IsValidJWToken(ss.Context())
	if !ok {
		return errInvalidToken
	}

	err := handler(srv, ss)
	return err
}

 测试

启动服务端, 然后再启动客户端, 可以看到服务端控制台输出:

代码语言:txt
复制
2022/09/28 20:22:26 Serving gRPC on 0.0.0.0:50051
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
HeroId:"hero_2" SkillId:"Skill_2" 
<nil>
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 1
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 2
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 3
2022/09/28 20:22:30 Received Msg: world
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 4
2022/09/28 20:22:31 Received Msg: world
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 1
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 2
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 3
2022/09/28 20:22:31 Received Msg: world

客户端输出

代码语言:txt
复制
sayHello wanted = DeadlineExceeded, got = OK
sayHello wanted = DeadlineExceeded, got = DeadlineExceeded
sayHello wanted = OK, got = OK

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
    • 超时
      • 重试
        • 配置说明
    • 案例演示
    • 服务端
      •  测试
      相关产品与服务
      多因子身份认证
      多因子身份认证(Multi-factor Authentication Service,MFAS)的目的是建立一个多层次的防御体系,通过结合两种或三种认证因子(基于记忆的/基于持有物的/基于生物特征的认证因子)验证访问者的身份,使系统或资源更加安全。攻击者即使破解单一因子(如口令、人脸),应用的安全依然可以得到保障。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档