go里面一般会使用Context进行超时控制以及参数传递, 其中超时控制可以使用context.WithDeadline()或者context.WithTimeout()实现, 二者实现效果是一致的.
gRPC基本上所有的对外函数都是带context参数的, 所以说它默认就集成了context的功能, 我们只需要在调用方法的时候传入 ctx 参数便可.
gRPC 中已经内置了 retry 功能,可以直接使用, 一般我们在请求失败后可能会重试几次
客户端需要通过grpc.WithDefaultServiceConfig()配置 retry 功能, 并且设置环境变量
export GRPC_GO_RETRY=on
{
"methodConfig": [{
"name": [{"service": "echo.Echo","method":"UnaryEcho"}],
"retryPolicy": {
"MaxAttempts": 4,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE","DEADLINE_EXCEEDED" ]
}}]
}
<package>.<service>
package就是proto文件中指定的package,service也是proto文件中指定的 Service Name。注意必须是服务端返回的code, 客户端导致的错误码不算
"OK",
"CANCELLED",
"UNKNOWN",
"INVALID_ARGUMENT",
"DEADLINE_EXCEEDED",
"NOT_FOUND",
"ALREADY_EXISTS",
"PERMISSION_DENIED",
"RESOURCE_EXHAUSTED",
"FAILED_PRECONDITION",
"ABORTED",
"OUT_OF_RANGE",
"UNIMPLEMENTED",
"INTERNAL",
"UNAVAILABLE",
"DATA_LOSS",
"UNAUTHENTICATED",
下面的案例演示了一个重试的案例, 同时你可以发现如果是客户端context超时, 那么重试机制就会不起作用, 因为只有服务端返回的错误码才作数.
client/main.go
package main
import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/oauth"
"google.golang.org/grpc/examples/data"
"grpc-demo/helloworld/auth"
"grpc-demo/helloworld/pb"
"log"
)
const (
address = "localhost:50051"
)
func main() {
// 更多配置信息查看官方文 https://github.com/grpc/grpc/blob/master/doc/service_config.md
// service这里语法为<package>.<service> package就是proto文件中指定的package, service也是proto文件中指定的 Service Name。
// method 可以不指定 即当前service下的所以方法都使用该配置。
retryPolicy := `{
"methodConfig": [{
"name": [{"service": "pb.Greeter","method":"SayHello"}],
"retryPolicy": {
"MaxAttempts": 4,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]}`
// 构建一个 PerRPCCredentials。
// 使用内置的Oauth2进行身份验证
oauthAuth := oauth.NewOauthAccess(auth.FetchToken())
// 使用自定义的的身份验证
userPwdAuth := auth.NewUserPwdAuth()
// 使用自定义的的身份验证
jwtAuth := auth.NewJWTAuthToken()
cred, err := credentials.NewClientTLSFromFile(data.Path("/Users/guirong/go/src/grpc-demo/helloworld/client/ca.crt"),
"www.ggr.com")
if err != nil {
log.Fatalf("failed to load credentials: %v", err)
}
conn, err := grpc.Dial(address,
grpc.WithDefaultServiceConfig(retryPolicy),
grpc.WithTransportCredentials(cred),
grpc.WithPerRPCCredentials(userPwdAuth),
grpc.WithPerRPCCredentials(oauthAuth),
grpc.WithPerRPCCredentials(jwtAuth),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
client := pb.NewBattleServiceClient(conn)
bidirectionalStreamBattle(client)
client2 := pb.NewGreeterClient(conn)
sayHello(client2)
}
client/client.go
package main
import (
"context"
"fmt"
"grpc-demo/helloworld/pb"
"io"
"log"
"os"
"time"
)
func bidirectionalStreamBattle(client pb.BattleServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stream, err := client.Battle(ctx)
if err != nil {
log.Fatalf("could not battle: %v", err)
}
err = stream.SendMsg(&pb.BattleRequest{
HeroId: "hero_1",
SkillId: "Skill_1",
})
if err != nil {
log.Fatalf("could not battle: %v", err)
}
err = stream.SendMsg(&pb.BattleRequest{
HeroId: "hero_2",
SkillId: "Skill_2",
})
if err != nil {
log.Fatalf("could not battle: %v", err)
}
ch := make(chan struct{})
go asyncDoBattle(stream, ch)
err = stream.CloseSend()
if err != nil {
log.Fatalf("could not battle: %v", err)
}
<-ch
}
func asyncDoBattle(stream pb.BattleService_BattleClient, c chan struct{}) {
for {
rsp, err := stream.Recv()
if err == io.EOF {
break
}
fmt.Println(rsp)
}
c <- struct{}{}
}
func sayHello(client pb.GreeterClient, want codes.Code) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 通过命令行参数指定 name
name := "world"
if len(os.Args) > 1 {
name = os.Args[1]
}
_, err := client.SayHello(ctx, &pb.HelloRequest{Name: name})
got := status.Code(err)
fmt.Printf("sayHello wanted = %v, got = %v\n", want, got)
}
为了更好地演示实验效果, 我们需要对server端代码做一点微调
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"grpc-demo/helloworld/auth"
pb "grpc-demo/helloworld/pb"
"io"
"log"
"time"
)
var (
errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata")
errInvalidToken = status.Errorf(codes.Unauthenticated, "invalid token")
)
// BattleServer 游戏战斗服务
type BattleServer struct {
pb.UnimplementedBattleServiceServer
}
// Battle 战斗数据回传
func (h *BattleServer) Battle(steam pb.BattleService_BattleServer) error {
for {
req, err := steam.Recv()
fmt.Println(req)
if err == io.EOF { //发送最后一次结果给前端
err = steam.Send(&pb.BattleResponse{})
if err != nil {
log.Println(err)
}
return nil
}
err = steam.Send(&pb.BattleResponse{
Hero: []*pb.HeroInfo{
{Id: "hero_1", Life: 999},
},
Skill: []*pb.SkillInfo{
{SkillId: "skill_1", CoolDown: 1664249248},
{SkillId: "skill_2", CoolDown: 1664249293},
},
})
if err != nil {
log.Println(err)
}
time.Sleep(time.Second * 3)
}
}
// GreeterServer 定义一个结构体用于实现 .proto文件中定义的方法
// 新版本 gRPC 要求必须嵌入 pb.UnimplementedGreeterServer 结构体
type GreeterServer struct {
pb.UnimplementedGreeterServer
}
var countRetry = 0
// SayHello 简单实现一下.proto文件中定义的 SayHello 方法
func (g *GreeterServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
countRetry++
fmt.Println("当前countRetry", countRetry)
if countRetry < 3 {
return nil, status.Errorf(codes.DeadlineExceeded, "maybeFailRequest: failing it")
}
if countRetry == 4 {
time.Sleep(time.Second)
countRetry=0
}
log.Printf("Received Msg: %v", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
// userPwdCheckInterceptor (用户名/密码)身份验证拦截器
func userPwdCheckInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
// 如果返回err不为nil则说明token验证未通过
err := auth.IsValidUserPwd(ctx)
if err != nil {
return nil, err
}
return handler(ctx, req)
}
// authTokenInterceptor (jwt和Oauth2 token)身份验证拦截器
func authTokenInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
// authentication (token verification)
md, ok := metadata.FromIncomingContext(ss.Context())
if !ok {
return errMissingMetadata
}
// 验证oauth2
if !auth.IsValidOauth2(md["authorization"]) {
return errInvalidToken
}
// 验证jwt
ok, _ = auth.IsValidJWToken(ss.Context())
if !ok {
return errInvalidToken
}
err := handler(srv, ss)
return err
}
启动服务端, 然后再启动客户端, 可以看到服务端控制台输出:
2022/09/28 20:22:26 Serving gRPC on 0.0.0.0:50051
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1"
HeroId:"hero_2" SkillId:"Skill_2"
<nil>
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 1
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 2
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 3
2022/09/28 20:22:30 Received Msg: world
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 4
2022/09/28 20:22:31 Received Msg: world
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 1
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 2
进入IsValidUserPwd验证用户名密码...
离开IsValidUserPwd用户名密码验证OK...
当前countRetry 3
2022/09/28 20:22:31 Received Msg: world
客户端输出
sayHello wanted = DeadlineExceeded, got = OK
sayHello wanted = DeadlineExceeded, got = DeadlineExceeded
sayHello wanted = OK, got = OK
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。