流处理即事件处理,简单的说是连续处理无限的数据/事件序列。下面用有向图来描述。
流处理(streaming process),有时也被称为事件处理(event processing),可以被简洁地描述为对于一个无限的数据或事件序列的连续处理。一个流,或事件,处理应用可以或多或少地由一个有向图,通常是一个有向无环图(DAG),来表达。在这样一个图中,每条边表示一个数据或事件流,而每个顶点表示使用应用定义好的逻辑来处理来自相邻边的数据或事件的算子。其中有两种特殊的顶点,通常被称作sources与sinks。Sources消费外部数据/事件并将其注入到应用当中,而sinks通常收集由应用产生的结果。图1描述了一个流处理应用的例子。
类比UDP协议,不关心消息是否成功,只发送一次,“尽力而为”。
数据/事件被保证会被应用中的所有算子至少处理一遍。消息第一次投递在算子2处理出现失败,会对数据/事件会被重放或重传; 二次重试处理超时,再次进行数据重放,结果是第二次和第三次重放最终的结果都是成功的。
无论发生任何故障,都会确保数据/事件只被算子处理一次。实现exactly-once有两种典型的机制:
机制里,流处理的每个应用算子都会周期性的checkpoint。如果发生故障,流处理中的应用算子会回滚到到最近一次全局一致处。在回滚过程中,所有的处理都会停止。流程会从最近一致处开始。
这种机制会为每个算子维护一份事务日志,来记录哪些数据/事件处理过了。
事务是将多个操作当做一个操作,保证这个操作的原子性。事务成功则所有子操作全部成功,失败则所有子操作全部失败。 事务四特性(ACID):
本地事务即局部事务。单机内,处理一组数据逻辑,这组操作要么全部成功,要么全部失败。比如:在同一数据库多次执行多条sql语句。本地事务不支持跨机器、跨数据库场景。
分布事务的解决方案有多种,2PC、TCC、本地消息表(异步确保)、Seata 2PC(改进)、MQ事务消息。本次重点说说MQ事务消息。
流处理模式中,只有消息B被成功生产消息A才会被标识为已消费。
Kafka的基本原理之前有文章介绍过,这里不再赘述原理
幂等即对接口多次调用的结果均一致。消息在处理失败时会进行重试,产生重复的消息。为实现幂等,Kafka引入了sequenceNumber(序列号)和producerID(PID)两个概念,其中PID对用户来说完全透明。每个生产者初始化时都会被分配一个PID。消息发送到每个分区都有一个唯一对应的从0开始自增的序列号,每发送一条消息就会将<PID,topic+partition>对应值+1。Broker的内部维护着对应的序列号,收到的消息只有序列号比Broker的序列号大1时,这条消息才会被接收。如果写入消息的序列号<Broker中的序列号,那么消息是重复消息,会被丢弃;如果消息系列号>Broker序列号+1,那么说明消息可能乱序,会抛出错误。
因为sequenceNumber是针对<PID,topic+partition>维度的,所以只能保证单个生产者会话中的单分区的幂等。
事务实现建立在幂等之上。且事务支持跨分区,使用场景分为两种:
Kafka通过事务可以保证跨生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。
Transaction Coordinator会周期性遍历PID和transactionID映射关系,如果transactionID没有正在进行中的事务,并且上一个事务的结束时间与现在时间差过大就会将这个删除。
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
)
var (
version = ""
)
// NewTxnProducer 新增事务Producer
func NewTxnProducer(brokers []string) (*sarama.AsyncProducer, error) {
version, err := sarama.ParseKafkaVersion(version)
if err != nil {
fmt.Printf("Error parsing Kafka version: %v", err)
return nil, err
}
config := sarama.NewConfig()
config.Version = version
// 幂等
config.Producer.Idempotent = true
config.Producer.Return.Errors = false
// 等待所有副本提交成功
config.Producer.RequiredAcks = sarama.WaitForAll
// 选择要发送消息的分区,默认为散列
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
// 每次重试之间等待集群稳定的时间
config.Producer.Transaction.Retry.Backoff = 10
// 事务中用于识别生产者的实例
config.Producer.Transaction.ID = "txn_producer"
// 一个连接在发送阻塞之前,允许有多少个未完成的请求(默认为5)。
config.Net.MaxOpenRequests = 1
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
fmt.Printf("NewAsyncProducer err: %v", err)
return nil, err
}
return &producer, nil
}
// SendMessage 发送消息
func SendMessage(producer sarama.AsyncProducer, msg string) error {
// 开始事务
err := producer.BeginTxn()
if err != nil {
fmt.Printf("BeginTxn err: %v", err)
return err
}
// 发送消息
producerMessage := &sarama.ProducerMessage{
Topic: "topic",
Key: nil,
Value: sarama.StringEncoder(msg),
}
producer.Input() <- producerMessage
// 提交事务
err = producer.CommitTxn()
if err != nil {
fmt.Printf("CommitTxn err: %v,msg: %s", err, msg)
if producer.TxnStatus()&sarama.ProducerTxnFlagFatalError != 0 {
fmt.Printf("need to recreate producer")
return err
}
if producer.TxnStatus()&sarama.ProducerTxnFlagAbortableError != 0 {
err = producer.AbortTxn()
if err != nil { // retry
fmt.Printf("Producer: unable to abort transaction: %+v", err)
return err
}
}
// 再次尝试提交
err = producer.CommitTxn()
if err != nil {
fmt.Printf("CommitTxn err: %v,msg: %s", err, msg)
return err
}
}
return nil
}
Pulsar基础之前的文章有介绍过,这里不再赘述原理
Pulsar事务支持端到端的流处理,即保证数据写入后不丢失、数据不会被重复处理。使用事务需要Pulsar 2.8.0 或更高版本,目前事务API仅支持JAVA客户端。
coordinator事务操作的proxy,任何事务相关的请求都要经过coordinator。事务的每步操作都会有Log记录并且有对应的状态。这样可以保证执行事务过程中的任何步骤、任何模块宕机消息都不会丢失,详细的日志也为重启提供了完整的数据。
RocketMQ基础之前的文章有介绍过,这里不再赘述原理
package rocketmq
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
rocketmq "github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
// DemoListener 测试demo使用
type DemoListener struct {
localTrans *sync.Map
transactionIndex int32
}
// NewDemoListener 创建事务检查对象
func NewDemoListener() *DemoListener {
return &DemoListener{
localTrans: new(sync.Map),
}
}
// CheckLocalTransaction 检查本地事务
func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId)
v, existed := dl.localTrans.Load(msg.TransactionId)
if !existed {
fmt.Printf("unknow msg: %v, return Commit", msg)
return primitive.CommitMessageState
}
// 校验本地事务状态
state := v.(primitive.LocalTransactionState)
switch state {
case 1:
fmt.Printf("checkLocalTransaction COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
case 2:
fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg)
return primitive.RollbackMessageState
case 3:
fmt.Printf("checkLocalTransaction unknow: %v\n", msg)
return primitive.UnknowState
default:
fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
}
}
// ExecuteLocalTransaction 执行本地事务逻辑
// msg: 事务消息
// return: 本地事务状态
func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId)
status := nextIndex % 3
dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1))
fmt.Printf("dl")
return primitive.UnknowState
}
// NewRocketmqTxnProducer 新增事务Producer
func NewRocketmqTxnProducer() (rocketmq.TransactionProducer, error) {
producer, err := rocketmq.NewTransactionProducer(
NewDemoListener(),
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(1),
)
if err != nil {
fmt.Printf("NewTransactionProducer err: %v", err)
return nil, err
}
return producer, nil
}
// SendRocketmqMessage 发送消息
func SendRocketmqMessage(producer rocketmq.TransactionProducer, topic, msg string) error {
result, err := producer.SendMessageInTransaction(
context.Background(), primitive.NewMessage(topic, []byte(msg)))
if err != nil {
fmt.Printf("SendMessageInTransaction err: %v,result: %s", err, result.String())
return err
}
err = producer.Shutdown()
if err != nil {
fmt.Printf("Shutdown err: %v", err)
return err
}
return nil
}