场景描述:两阶段提交(two-phase commit, 2PC)是最基础的分布式一致性协议,应用广泛。本文来介绍它的相关细节以及它在Flink中的典型应用场景。。
关键词:2PC Flink
2PC简介
先介绍两个前置概念。在分布式系统中,为了让每个节点都能够感知到其他节点的事务执行状况,需要引入一个中心节点来统一处理所有节点的执行逻辑,这个中心节点叫做协调者(coordinator),被中心节点调度的其他业务节点叫做参与者(participant)。
接下来正式介绍2PC。顾名思义,2PC将分布式事务分成了两个阶段,两个阶段分别为提交请求(投票)和提交(执行)。协调者根据参与者的响应来决定是否需要真正地执行事务,具体流程如下。
分为成功与失败两种情况。
下图分别示出这两种情况。
提交成功
提交失败
2PC的优点在于原理非常简单,容易理解及实现。缺点主要有3个,列举如下:
Flink基于2PC的事务性写入
2PC的最常见应用场景其实是关系型数据库,比如MySQL InnoDB存储引擎的XA事务系统。
Flink作为流式处理引擎,自然也提供了对exactly once语义的保证。端到端的exactly once语义,是输入、处理逻辑、输出三部分协同作用的结果。Flink内部依托检查点机制和轻量级分布式快照算法ABS保证exactly once。而要实现精确一次的输出逻辑,则需要施加以下两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write)。
在Spark Streaming中,要实现事务性写入完全靠用户自己,框架本身并没有提供任何实现。但是在Flink中提供了基于2PC的SinkFunction,名为TwoPhaseCommitSinkFunction,帮助我们做了一些基础的工作。
TwoPhaseCommitSinkFunction的继承关系
Flink官方推荐所有需要保证exactly once的Sink逻辑都继承该抽象类。它定义了如下4个抽象方法,需要子类实现。
protected abstract TXN beginTransaction() throws Exception;
protected abstract void preCommit(TXN transaction) throws Exception;
protected abstract void commit(TXN transaction);
protected abstract void abort(TXN transaction);
下面以Flink与Kafka的集成来说明2PC的具体流程。注意这里的Kafka版本必须是0.11及以上,因为只有0.11+的版本才支持幂等producer以及事务性,从而2PC才有存在的意义。Kafka内部事务性的机制如下框图所示。这就是另外一个大的话题了,本文不表。
粗略看看FlinkKafkaProducer011类实现的beginTransaction()方法。
@Override
protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
switch (semantic) {
case EXACTLY_ONCE:
FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();
producer.beginTransaction();
return new KafkaTransactionState(producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
case NONE:
// Do not create new producer on each beginTransaction() if it is not necessary
final KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction != null && currentTransaction.producer != null) {
return new KafkaTransactionState(currentTransaction.producer);
}
return new KafkaTransactionState(initNonTransactionalProducer(true));
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
}
可见,当要求exactly once语义时,会调用createTransactionalProducer()生成包含事务ID的producer。
FlinkKafkaProducer011.preCommit()方法的实现很简单。其中的flush()方法实际上是代理了KafkaProducer.flush()方法。
@Override
protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
switch (semantic) {
case EXACTLY_ONCE:
case AT_LEAST_ONCE:
flush(transaction);
break;
case NONE:
break;
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
checkErroneous();
}
那么preCommit()方法是在哪里使用的呢?答案是TwoPhaseCommitSinkFunction.snapshotState()方法。从前面的类图可以得知,TwoPhaseCommitSinkFunction也继承了CheckpointedFunction接口,所以2PC是与检查点机制一同发挥作用的。
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transaction
checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");
long checkpointId = context.getCheckpointId();
LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);
preCommit(currentTransactionHolder.handle);
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
state.clear();
state.add(new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
结合Flink检查点的原理,可以用下图来形象地表示预提交阶段的流程。
每当需要做checkpoint时,JobManager就在数据流中打入一个屏障(barrier),作为检查点的界限。屏障随着算子链向下游传递,每到达一个算子都会触发将状态快照写入状态后端的动作。当屏障到达Kafka sink后,通过KafkaProducer.flush()方法刷写消息数据,但还未真正提交。接下来还是需要通过检查点来触发提交阶段。
FlinkKafkaProducer011.commit()方法实际上是代理了KafkaProducer.commitTransaction()方法,正式向Kafka提交事务。
@Override
protected void commit(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
transaction.producer.commitTransaction();
} finally {
recycleTransactionalProducer(transaction.producer);
}
}
}
该方法的调用点位于TwoPhaseCommitSinkFunction.notifyCheckpointComplete()方法中。顾名思义,当所有检查点都成功完成之后,会回调这个方法。
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);
logWarningIfTimeoutAlmostReached(pendingTransaction);
try {
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}
LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
pendingTransactionIterator.remove();
}
if (firstError != null) {
throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
firstError);
}
}
该方法每次从正在等待提交的事务句柄中取出一个,校验它的检查点ID,并调用commit()方法提交之。这阶段的流程可以用下图来表示。
可见,只有在所有检查点都成功完成这个前提下,写入才会成功。这符合前文所述2PC的流程,其中JobManager为协调者,各个算子为参与者(不过只有sink一个参与者会执行提交)。一旦有检查点失败,notifyCheckpointComplete()方法就不会执行。如果重试也不成功的话,最终会调用abort()方法回滚事务。
@Override
protected void abort(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
transaction.producer.abortTransaction();
recycleTransactionalProducer(transaction.producer);
}
}