本文主要研究一下canal-go的SimpleCanalConnector
canal-go-v1.0.7/client/simple_canal_connector.go
type SimpleCanalConnector struct {
Address string
Port int
UserName string
PassWord string
SoTime int32
IdleTimeOut int32
ClientIdentity pb.ClientIdentity
Connected bool
Running bool
Filter string
RollbackOnConnect bool
LazyParseEntry bool
}
canal-go-v1.0.7/client/simple_canal_connector.go
//NewSimpleCanalConnector 创建SimpleCanalConnector实例
func NewSimpleCanalConnector(address string, port int, username string, password string, destination string, soTimeOut int32, idleTimeOut int32) *SimpleCanalConnector {
s := &SimpleCanalConnector{
Address: address,
Port: port,
UserName: username,
PassWord: password,
ClientIdentity: pb.ClientIdentity{Destination: destination, ClientId: 1001},
SoTime: soTimeOut,
IdleTimeOut: idleTimeOut,
RollbackOnConnect: true,
}
return s
}
canal-go-v1.0.7/client/simple_canal_connector.go
//Connect 连接Canal-server
func (c *SimpleCanalConnector) Connect() error {
if c.Connected {
return nil
}
if c.Running {
return nil
}
err := c.doConnect()
if err != nil {
return err
}
if c.Filter != "" {
c.Subscribe(c.Filter)
}
if c.RollbackOnConnect {
c.waitClientRunning()
c.RollBack(0)
}
c.Connected = true
return nil
}
canal-go-v1.0.7/client/simple_canal_connector.go
//DisConnection 关闭连接
func (c *SimpleCanalConnector) DisConnection() {
if c.RollbackOnConnect && c.Connected == true {
c.RollBack(0)
}
c.Connected = false
quitelyClose()
}
//quitelyClose 优雅关闭
func quitelyClose() {
if conn != nil {
conn.Close()
}
}
canal-go-v1.0.7/client/simple_canal_connector.go
//doConnect 去连接Canal-Server
func (c SimpleCanalConnector) doConnect() error {
address := c.Address + ":" + fmt.Sprintf("%d", c.Port)
con, err := net.Dial("tcp", address)
if err != nil {
return err
}
conn = con
p := new(pb.Packet)
data, err := readNextPacket()
if err != nil {
return err
}
err = proto.Unmarshal(data, p)
if err != nil {
return err
}
if p != nil {
if p.GetVersion() != 1 {
panic("unsupported version at this client.")
}
if p.GetType() != pb.PacketType_HANDSHAKE {
panic("expect handshake but found other type.")
}
handshake := &pb.Handshake{}
err = proto.Unmarshal(p.GetBody(), handshake)
if err != nil {
return err
}
pas := []byte(c.PassWord)
ca := &pb.ClientAuth{
Username: c.UserName,
Password: pas,
NetReadTimeoutPresent: &pb.ClientAuth_NetReadTimeout{NetReadTimeout: c.IdleTimeOut},
NetWriteTimeoutPresent: &pb.ClientAuth_NetWriteTimeout{NetWriteTimeout: c.IdleTimeOut},
}
caByteArray, _ := proto.Marshal(ca)
packet := &pb.Packet{
Type: pb.PacketType_CLIENTAUTHENTICATION,
Body: caByteArray,
}
packArray, _ := proto.Marshal(packet)
WriteWithHeader(packArray)
pp, err := readNextPacket()
if err != nil {
return err
}
pk := &pb.Packet{}
err = proto.Unmarshal(pp, pk)
if err != nil {
return err
}
if pk.Type != pb.PacketType_ACK {
panic("unexpected packet type when ack is expected")
}
ackBody := &pb.Ack{}
err = proto.Unmarshal(pk.GetBody(), ackBody)
if err != nil {
return err
}
if ackBody.GetErrorCode() > 0 {
panic(errors.New(fmt.Sprintf("something goes wrong when doing authentication:%s", ackBody.GetErrorMessage())))
}
c.Connected = true
}
return nil
}
canal-go-v1.0.7/client/simple_canal_connector.go
//GetWithOutAck 获取数据不Ack
func (c *SimpleCanalConnector) GetWithOutAck(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) {
c.waitClientRunning()
if !c.Running {
return nil, nil
}
var size int32
if batchSize < 0 {
size = 1000
} else {
size = batchSize
}
var time *int64
var t int64
t = -1
if timeOut == nil {
time = &t
} else {
time = timeOut
}
var i int32
i = -1
if units == nil {
units = &i
}
get := new(pb.Get)
get.AutoAckPresent = &pb.Get_AutoAck{AutoAck: false}
get.Destination = c.ClientIdentity.Destination
get.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
get.FetchSize = size
get.TimeoutPresent = &pb.Get_Timeout{Timeout: *time}
get.UnitPresent = &pb.Get_Unit{Unit: *units}
getBody, err := proto.Marshal(get)
if err != nil {
return nil, err
}
packet := new(pb.Packet)
packet.Type = pb.PacketType_GET
packet.Body = getBody
pa, err := proto.Marshal(packet)
if err != nil {
return nil, err
}
WriteWithHeader(pa)
message, err := c.receiveMessages()
if err != nil {
return nil, err
}
return message, nil
}
canal-go-v1.0.7/client/simple_canal_connector.go
//Get 获取数据并且Ack数据
func (c *SimpleCanalConnector) Get(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) {
message, err := c.GetWithOutAck(batchSize, timeOut, units)
if err != nil {
return nil, err
}
err = c.Ack(message.Id)
if err != nil {
return nil, err
}
return message, nil
}
canal-go-v1.0.7/client/simple_canal_connector.go
//Ack Ack Canal-server的数据(就是昨晚某些逻辑操作后删除canal-server端的数据)
func (c *SimpleCanalConnector) Ack(batchId int64) error {
c.waitClientRunning()
if !c.Running {
return nil
}
ca := new(pb.ClientAck)
ca.Destination = c.ClientIdentity.Destination
ca.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
ca.BatchId = batchId
clientAck, err := proto.Marshal(ca)
if err != nil {
return err
}
pa := new(pb.Packet)
pa.Type = pb.PacketType_CLIENTACK
pa.Body = clientAck
pack, err := proto.Marshal(pa)
if err != nil {
return err
}
WriteWithHeader(pack)
return nil
}
canal-go-v1.0.7/client/simple_canal_connector.go
//Subscribe 订阅
func (c *SimpleCanalConnector) Subscribe(filter string) error {
c.waitClientRunning()
if !c.Running {
return nil
}
body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter})
pack := new(pb.Packet)
pack.Type = pb.PacketType_SUBSCRIPTION
pack.Body = body
packet, _ := proto.Marshal(pack)
WriteWithHeader(packet)
p := new(pb.Packet)
paBytes, err := readNextPacket()
if err != nil {
return err
}
err = proto.Unmarshal(paBytes, p)
if err != nil {
return err
}
ack := new(pb.Ack)
err = proto.Unmarshal(p.Body, ack)
if err != nil {
return err
}
if ack.GetErrorCode() > 0 {
return fmt.Errorf("failed to subscribe with reason::%s", ack.GetErrorMessage())
}
c.Filter = filter
return nil
}
canal-go-v1.0.7/client/simple_canal_connector.go
//UnSubscribe 取消订阅
func (c *SimpleCanalConnector) UnSubscribe() error {
c.waitClientRunning()
if c.Running {
return nil
}
us := new(pb.Unsub)
us.Destination = c.ClientIdentity.Destination
us.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
unSub, err := proto.Marshal(us)
if err != nil {
return err
}
pa := new(pb.Packet)
pa.Type = pb.PacketType_UNSUBSCRIPTION
pa.Body = unSub
pack, err := proto.Marshal(pa)
WriteWithHeader(pack)
p, err := readNextPacket()
if err != nil {
return err
}
pa = nil
err = proto.Unmarshal(p, pa)
if err != nil {
return err
}
ack := new(pb.Ack)
err = proto.Unmarshal(pa.Body, ack)
if err != nil {
return err
}
if ack.GetErrorCode() > 0 {
panic(errors.New(fmt.Sprintf("failed to unSubscribe with reason:%s", ack.GetErrorMessage())))
}
return nil
}
canal-go-v1.0.7/client/simple_canal_connector.go
//RollBack 回滚操作
func (c *SimpleCanalConnector) RollBack(batchId int64) error {
c.waitClientRunning()
cb := new(pb.ClientRollback)
cb.Destination = c.ClientIdentity.Destination
cb.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
cb.BatchId = batchId
clientBollBack, err := proto.Marshal(cb)
if err != nil {
return err
}
pa := new(pb.Packet)
pa.Type = pb.PacketType_CLIENTROLLBACK
pa.Body = clientBollBack
pack, err := proto.Marshal(pa)
if err != nil {
return err
}
WriteWithHeader(pack)
return nil
}
SimpleCanalConnector定义了Address、Port、UserName、PassWord、SoTime、IdleTimeOut、ClientIdentity、Connected、Running、Filter、RollbackOnConnect、LazyParseEntry属性;它提供了Connect、DisConnection、GetWithOutAck、Get、Ack、Subscribe、UnSubscribe、RollBack方法