Pitaya是一款由国外游戏公司topfreegames使用golang进行编写,易于使用,快速且轻量级的开源分布式游戏服务器框架 Pitaya使用etcd作为默认的服务发现组件,提供使用nats和grpc进行远程调用(server to server)的可选配置,并提供在docker中运行以上组件(etcd、nats)的docker-compose配置
type PlayerConn interface {
GetNextMessage() (b []byte, err error)
net.Conn
}
type Acceptor interface {
ListenAndServe()
Stop()
GetAddr() string
GetConnChan() chan PlayerConn
}
type Wrapper interface {
Wrap(acceptor.Acceptor) acceptor.Acceptor
}
type (
// Agent corresponds to a user and is used for storing raw Conn information
Agent struct {
Session *session.Session // session
appDieChan chan bool // app die channel
chDie chan struct{} // wait for close
chSend chan pendingWrite // push message queue
chStopHeartbeat chan struct{} // stop heartbeats
chStopWrite chan struct{} // stop writing messages
closeMutex sync.Mutex
conn net.Conn // low-level conn fd
decoder codec.PacketDecoder // binary decoder
encoder codec.PacketEncoder // binary encoder
heartbeatTimeout time.Duration
lastAt int64 // last heartbeat unix time stamp
messageEncoder message.Encoder
... ...
state int32 // current agent state
}
pendingWrite struct {
ctx context.Context
data []byte
err error
}
)
type Component interface {
Init()
AfterInit()
BeforeShutdown()
Shutdown()
}
type (
//Handler represents a message.Message's handler's meta information.
Handler struct {
Receiver reflect.Value // receiver of method
Method reflect.Method // method stub
Type reflect.Type // low-level type of method
IsRawArg bool // whether the data need to serialize
MessageType message.Type // handler allowed message type (either request or notify)
}
//Remote represents remote's meta information.
Remote struct {
Receiver reflect.Value // receiver of method
Method reflect.Method // method stub
HasArgs bool // if remote has no args we won't try to serialize received data into arguments
Type reflect.Type // low-level type of method
}
// Service implements a specific service, some of it's methods will be
// called when the correspond events is occurred.
Service struct {
Name string // name of service
Type reflect.Type // type of the receiver
Receiver reflect.Value // receiver of methods for the service
Handlers map[string]*Handler // registered methods
Remotes map[string]*Remote // registered remote methods
Options options // options
}
)
type Base struct{}
func (c *Base) Init() error {
return nil
}
func (c *Base) AfterInit() {}
func (c *Base) BeforeShutdown() {}
func (c *Base) Shutdown() error {
return nil
}
集中管理的对象容器在外部module.go中定义
var (
modulesMap = make(map[string]interfaces.Module)
modulesArr = []moduleWrapper{}
)
type moduleWrapper struct {
module interfaces.Module
name string
}
type (
HandlerService struct {
appDieChan chan bool // die channel app
chLocalProcess chan unhandledMessage // channel of messages that will be processed locally
chRemoteProcess chan unhandledMessage // channel of messages that will be processed remotely
decoder codec.PacketDecoder // binary decoder
encoder codec.PacketEncoder // binary encoder
heartbeatTimeout time.Duration
messagesBufferSize int
remoteService *RemoteService
serializer serialize.Serializer // message serializer
server *cluster.Server // server obj
services map[string]*component.Service // all registered service
messageEncoder message.Encoder
metricsReporters []metrics.Reporter
}
unhandledMessage struct {
ctx context.Context
agent *agent.Agent
route *route.Route
msg *message.Message
}
)
type RemoteService struct {
rpcServer cluster.RPCServer
serviceDiscovery cluster.ServiceDiscovery
serializer serialize.Serializer
encoder codec.PacketEncoder
rpcClient cluster.RPCClient
services map[string]*component.Service // all registered service
router *router.Router
messageEncoder message.Encoder
server *cluster.Server // server obj
remoteBindingListeners []cluster.RemoteBindingListener
}
var (
Manager = &struct {
incrementID int64
timers sync.Map
ChClosingTimer chan int64
ChCreatedTimer chan *Timer
}{}
Precision = time.Second
GlobalTicker *time.Ticker
)
var (
BeforeHandler = &pipelineChannel{}
AfterHandler = &pipelineAfterChannel{}
)
type (
HandlerTempl func(ctx context.Context, in interface{}) (out interface{}, err error)
AfterHandlerTempl func(ctx context.Context, out interface{}, err error) (interface{}, error)
pipelineChannel struct {
Handlers []HandlerTempl
}
pipelineAfterChannel struct {
Handlers []AfterHandlerTempl
}
)
app.go是系统启动的入口 创建HandlerService 并根据启动模式如果是集群模式创建RemoteService 开启服务端事件监听 开启监听服务器关闭信号的Chan
var (
app = &App{
... ..
}
remoteService *service.RemoteService
handlerService *service.HandlerService
)
func Start() {
... ..
if app.serverMode == Cluster {
... ..
app.router.SetServiceDiscovery(app.serviceDiscovery)
remoteService = service.NewRemoteService(
app.rpcClient,
app.rpcServer,
app.serviceDiscovery,
app.router,
... ..
)
app.rpcServer.SetPitayaServer(remoteService)
initSysRemotes()
}
handlerService = service.NewHandlerService(
app.dieChan,
app.heartbeat,
app.server,
remoteService,
... ..
)
... ..
listen()
... ..
// stop server
select {
case <-app.dieChan:
logger.Log.Warn("the app will shutdown in a few seconds")
case s := <-sg:
logger.Log.Warn("got signal: ", s, ", shutting down...")
close(app.dieChan)
}
... ..
}
listen方法也就是开启服务,具体包括以下步骤: 1.注册Component 2.注册定时任务的GlobalTicker 3.开启Dispatch处理业务和定时任务(ticket)的goroutine 4.开启acceptor处理连接的goroutine 5.开启主逻辑的goroutine 6.注册Modules
func listen() {
startupComponents()
timer.GlobalTicker = time.NewTicker(timer.Precision)
logger.Log.Infof("starting server %s:%s", app.server.Type, app.server.ID)
for i := 0; i < app.config.GetInt("pitaya.concurrency.handler.dispatch"); i++ {
go handlerService.Dispatch(i)
}
for _, acc := range app.acceptors {
a := acc
go func() {
for conn := range a.GetConnChan() {
go handlerService.Handle(conn)
}
}()
go func() {
a.ListenAndServe()
}()
logger.Log.Infof("listening with acceptor %s on addr %s", reflect.TypeOf(a), a.GetAddr())
}
... ..
startModules()
logger.Log.Info("all modules started!")
app.running = true
}
startupComponents对Component进行初始化 然后把Component注册到handlerService和remoteService上
func startupComponents() {
// component initialize hooks
for _, c := range handlerComp {
c.comp.Init()
}
// component after initialize hooks
for _, c := range handlerComp {
c.comp.AfterInit()
}
// register all components
for _, c := range handlerComp {
if err := handlerService.Register(c.comp, c.opts); err != nil {
logger.Log.Errorf("Failed to register handler: %s", err.Error())
}
}
// register all remote components
for _, c := range remoteComp {
if remoteService == nil {
logger.Log.Warn("registered a remote component but remoteService is not running! skipping...")
} else {
if err := remoteService.Register(c.comp, c.opts); err != nil {
logger.Log.Errorf("Failed to register remote: %s", err.Error())
}
}
}
... ..
}
比如HandlerService的注册,反射得到component类型的全部方法,判断isHandlerMethod就加入services里面 并聚合Component对象的反射Value对象为全部Handler的Method Receiver,减少了对象引用
func NewService(comp Component, opts []Option) *Service {
s := &Service{
Type: reflect.TypeOf(comp),
Receiver: reflect.ValueOf(comp),
}
... ..
return s
}
func (h *HandlerService) Register(comp component.Component, opts []component.Option) error {
s := component.NewService(comp, opts)
... ..
if err := s.ExtractHandler(); err != nil {
return err
}
h.services[s.Name] = s
for name, handler := range s.Handlers {
handlers[fmt.Sprintf("%s.%s", s.Name, name)] = handler
}
return nil
}
func (s *Service) ExtractHandler() error {
typeName := reflect.Indirect(s.Receiver).Type().Name()
... ..
s.Handlers = suitableHandlerMethods(s.Type, s.Options.nameFunc)
... ..
for i := range s.Handlers {
s.Handlers[i].Receiver = s.Receiver
}
return nil
}
func suitableHandlerMethods(typ reflect.Type, nameFunc func(string) string) map[string]*Handler {
methods := make(map[string]*Handler)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mt := method.Type
mn := method.Name
if isHandlerMethod(method) {
... ..
handler := &Handler{
Method: method,
IsRawArg: raw,
MessageType: msgType,
}
... ..
methods[mn] = handler
}
}
return methods
}
handlerService.Dispatch方法负责各种业务的处理,包括: 1.处理chLocalProcess中的本地Message 2.使用remoteService处理chRemoteProcess中的远程Message 3.在定时ticket到达时调用timer.Cron执行定时任务 4.管理定时任务的创建 5.管理定时任务的删除
func (h *HandlerService) Dispatch(thread int) {
defer timer.GlobalTicker.Stop()
for {
select {
case lm := <-h.chLocalProcess:
metrics.ReportMessageProcessDelayFromCtx(lm.ctx, h.metricsReporters, "local")
h.localProcess(lm.ctx, lm.agent, lm.route, lm.msg)
case rm := <-h.chRemoteProcess:
metrics.ReportMessageProcessDelayFromCtx(rm.ctx, h.metricsReporters, "remote")
h.remoteService.remoteProcess(rm.ctx, nil, rm.agent, rm.route, rm.msg)
case <-timer.GlobalTicker.C: // execute cron task
timer.Cron()
case t := <-timer.Manager.ChCreatedTimer: // new Timers
timer.AddTimer(t)
case id := <-timer.Manager.ChClosingTimer: // closing Timers
timer.RemoveTimer(id)
}
}
}
接下来看看Acceptor的工作,以下为Tcp实现,就是负责接收连接,流入acceptor的Chan
func (a *TCPAcceptor) ListenAndServe() {
if a.hasTLSCertificates() {
a.ListenAndServeTLS(a.certFile, a.keyFile)
return
}
listener, err := net.Listen("tcp", a.addr)
if err != nil {
logger.Log.Fatalf("Failed to listen: %s", err.Error())
}
a.listener = listener
a.running = true
a.serve()
}
func (a *TCPAcceptor) serve() {
defer a.Stop()
for a.running {
conn, err := a.listener.Accept()
if err != nil {
logger.Log.Errorf("Failed to accept TCP connection: %s", err.Error())
continue
}
a.connChan <- &tcpPlayerConn{
Conn: conn,
}
}
}
前面讲过对于每个Acceptor开启了一个goroutine去处理连接,也就是下面代码
for conn := range a.GetConnChan() {
go handlerService.Handle(conn)
}
所以流入Chan的连接就会被实时的开启一个goroutine去处理,处理过程就是先创建一个Agent对象 并开启一个goroutine给Agent负责维护连接的心跳 然后开启死循环,读取连接的数据processPacket
func (h *HandlerService) Handle(conn acceptor.PlayerConn) {
// create a client agent and startup write goroutine
a := agent.NewAgent(conn, h.decoder, h.encoder, h.serializer, h.heartbeatTimeout, h.messagesBufferSize, h.appDieChan, h.messageEncoder, h.metricsReporters)
// startup agent goroutine
go a.Handle()
... ..
for {
msg, err := conn.GetNextMessage()
if err != nil {
logger.Log.Errorf("Error reading next available message: %s", err.Error())
return
}
packets, err := h.decoder.Decode(msg)
if err != nil {
logger.Log.Errorf("Failed to decode message: %s", err.Error())
return
}
if len(packets) < 1 {
logger.Log.Warnf("Read no packets, data: %v", msg)
continue
}
// process all packet
for i := range packets {
if err := h.processPacket(a, packets[i]); err != nil {
logger.Log.Errorf("Failed to process packet: %s", err.Error())
return
}
}
}
}
这时如果使用了pitaya提供的漏桶算法实现的限流wrap来包装acceptor,则会对客户端发送的消息进行限流限速 这里也是灵活利用for循环遍历chan的特性,所以也是实时地对连接进行包装
func (b *BaseWrapper) ListenAndServe() {
go b.pipe()
b.Acceptor.ListenAndServe()
}
// GetConnChan returns the wrapper conn chan
func (b *BaseWrapper) GetConnChan() chan acceptor.PlayerConn {
return b.connChan
}
func (b *BaseWrapper) pipe() {
for conn := range b.Acceptor.GetConnChan() {
b.connChan <- b.wrapConn(conn)
}
}
type RateLimitingWrapper struct {
BaseWrapper
}
func NewRateLimitingWrapper(c *config.Config) *RateLimitingWrapper {
r := &RateLimitingWrapper{}
r.BaseWrapper = NewBaseWrapper(func(conn acceptor.PlayerConn) acceptor.PlayerConn {
... ..
return NewRateLimiter(conn, limit, interval, forceDisable)
})
return r
}
func (r *RateLimitingWrapper) Wrap(a acceptor.Acceptor) acceptor.Acceptor {
r.Acceptor = a
return r
}
func (r *RateLimiter) GetNextMessage() (msg []byte, err error) {
if r.forceDisable {
return r.PlayerConn.GetNextMessage()
}
for {
msg, err := r.PlayerConn.GetNextMessage()
if err != nil {
return nil, err
}
now := time.Now()
if r.shouldRateLimit(now) {
logger.Log.Errorf("Data=%s, Error=%s", msg, constants.ErrRateLimitExceeded)
metrics.ReportExceededRateLimiting(pitaya.GetMetricsReporters())
continue
}
return msg, err
}
}
processPacket对数据包解包后,执行processMessage
func (h *HandlerService) processPacket(a *agent.Agent, p *packet.Packet) error {
switch p.Type {
case packet.Handshake:
... ..
case packet.HandshakeAck:
... ..
case packet.Data:
if a.GetStatus() < constants.StatusWorking {
return fmt.Errorf("receive data on socket which is not yet ACK, session will be closed immediately, remote=%s",
a.RemoteAddr().String())
}
msg, err := message.Decode(p.Data)
if err != nil {
return err
}
h.processMessage(a, msg)
case packet.Heartbeat:
// expected
}
a.SetLastAt()
return nil
}
processMessage中包装数据包为unHandledMessage 根据消息类型,流入chLocalProcess 或者chRemoteProcess 也就转交给上面提到的负责Dispatch的goroutine去处理了
func (h *HandlerService) processMessage(a *agent.Agent, msg *message.Message) {
requestID := uuid.New()
ctx := pcontext.AddToPropagateCtx(context.Background(), constants.StartTimeKey, time.Now().UnixNano())
ctx = pcontext.AddToPropagateCtx(ctx, constants.RouteKey, msg.Route)
ctx = pcontext.AddToPropagateCtx(ctx, constants.RequestIDKey, requestID.String())
tags := opentracing.Tags{
"local.id": h.server.ID,
"span.kind": "server",
"msg.type": strings.ToLower(msg.Type.String()),
"user.id": a.Session.UID(),
"request.id": requestID.String(),
}
ctx = tracing.StartSpan(ctx, msg.Route, tags)
ctx = context.WithValue(ctx, constants.SessionCtxKey, a.Session)
r, err := route.Decode(msg.Route)
... ..
message := unhandledMessage{
ctx: ctx,
agent: a,
route: r,
msg: msg,
}
if r.SvType == h.server.Type {
h.chLocalProcess <- message
} else {
if h.remoteService != nil {
h.chRemoteProcess <- message
} else {
logger.Log.Warnf("request made to another server type but no remoteService running")
}
}
}
服务器进程启动的最后一步是对全局模块启动
在外部的module.go文件中,提供了对module的全局注册方法、全部顺序启动方法、全部顺序关闭方法
func RegisterModule(module interfaces.Module, name string) error {
... ..
}
func startModules() {
for _, modWrapper := range modulesArr {
modWrapper.module.Init()
}
for _, modWrapper := range modulesArr {
modWrapper.module.AfterInit()
}
}
func shutdownModules() {
for i := len(modulesArr) - 1; i >= 0; i-- {
modulesArr[i].module.BeforeShutdown()
}
for i := len(modulesArr) - 1; i >= 0; i-- {
mod := modulesArr[i].module
mod.Shutdown()
}
}
func (h *HandlerService) localProcess(ctx context.Context, a *agent.Agent, route *route.Route, msg *message.Message) {
var mid uint
switch msg.Type {
case message.Request:
mid = msg.ID
case message.Notify:
mid = 0
}
ret, err := processHandlerMessage(ctx, route, h.serializer, a.Session, msg.Data, msg.Type, false)
if msg.Type != message.Notify {
... ..
err := a.Session.ResponseMID(ctx, mid, ret)
... ..
} else {
metrics.ReportTimingFromCtx(ctx, h.metricsReporters, handlerType, nil)
tracing.FinishSpan(ctx, err)
}
}
func processHandlerMessage(
ctx context.Context,
rt *route.Route,
serializer serialize.Serializer,
session *session.Session,
data []byte,
msgTypeIface interface{},
remote bool,
) ([]byte, error) {
if ctx == nil {
ctx = context.Background()
}
ctx = context.WithValue(ctx, constants.SessionCtxKey, session)
ctx = util.CtxWithDefaultLogger(ctx, rt.String(), session.UID())
h, err := getHandler(rt)
... ..
msgType, err := getMsgType(msgTypeIface)
... ..
logger := ctx.Value(constants.LoggerCtxKey).(logger.Logger)
exit, err := h.ValidateMessageType(msgType)
... ..
arg, err := unmarshalHandlerArg(h, serializer, data)
... ..
if arg, err = executeBeforePipeline(ctx, arg); err != nil {
return nil, err
}
... ..
args := []reflect.Value{h.Receiver, reflect.ValueOf(ctx)}
if arg != nil {
args = append(args, reflect.ValueOf(arg))
}
resp, err := util.Pcall(h.Method, args)
if remote && msgType == message.Notify {
resp = []byte("ack")
}
resp, err = executeAfterPipeline(ctx, resp, err)
... ..
ret, err := serializeReturn(serializer, resp)
... ..
return ret, nil
}
func executeBeforePipeline(ctx context.Context, data interface{}) (interface{}, error) {
var err error
res := data
if len(pipeline.BeforeHandler.Handlers) > 0 {
for _, h := range pipeline.BeforeHandler.Handlers {
res, err = h(ctx, res)
if err != nil {
logger.Log.Debugf("pitaya/handler: broken pipeline: %s", err.Error())
return res, err
}
}
}
return res, nil
}
func executeAfterPipeline(ctx context.Context, res interface{}, err error) (interface{}, error) {
ret := res
if len(pipeline.AfterHandler.Handlers) > 0 {
for _, h := range pipeline.AfterHandler.Handlers {
ret, err = h(ctx, ret, err)
}
}
return ret, err
}
util.pcall里展示了golang反射的一种高级用法 method.Func.Call,第一个参数是Receiver,也就是调用对象方法的实例 这种设计对比直接保存Value对象的method,反射时直接call,拥有的额外好处就是降低了对象引用,方法不和实例绑定
func Pcall(method reflect.Method, args []reflect.Value) (rets interface{}, err error) {
... ..
r := method.Func.Call(args)
if len(r) == 2 {
if v := r[1].Interface(); v != nil {
err = v.(error)
} else if !r[0].IsNil() {
rets = r[0].Interface()
} else {
err = constants.ErrReplyShouldBeNotNull
}
}
return
}