Hi,我是王知无,一个大数据领域的原创作者。
在Flink中使用Async I/O的话,需要有一个支持异步请求的客户端,或者以多线程异步的方式来将同步操作转化为异步操作调用;
以官方文档给出的说明为例:
// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// issue the asynchronous request, receive a future for result
// 发起异步请求,返回结果是一个Future
final Future<String> result = client.query(key);
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
// 请求完成时的回调,将结果交给 ResultFuture
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// create the original stream
DataStream<String> stream = ...;
// apply the async I/O transformation
// 应用async I/O转换,设置等待模式、超时时间、以及进行中的异步请求的最大数量
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
AsyncDataStream提供了两种调用方法,分别是orderedWait和unorderedWait,这分别对应了有序和无序两种输出模式。
之所以会提供两种输出模式,是因为异步请求的完成时间是不确定的,先发出的请求的完成时间可能会晚于后发出的请求。
值得注意的是,在使用“事件时间”的情况下,“无序”输出模式仍然可以保证watermark的正常处理,即在两个watermark之间的消息的异步请求结果可能是异步提交的,但在watermark之后的消息不能先于该watermark之前的消息提交。
由于异步请求的完成时间不确定,需要设置请求的超时时间,并配置同时进行中的异步请求的最大数量。
AsyncDataStream在运行时被转换为AsyncWaitOperator算子,它是AbstractUdfStreamOperator的子类。其AsyncWaitOperator的基本实现原理如下:
AsyncWaitOperator算子相比于其它算子的最大不同在于,它的输入和输出并不是同步的。
因此,在AsyncWaitOperator内部采用了一种“生产者-消费者”模型,基于一个队列解耦异步计算和计算结果的提交。StreamElementQueue提供了一种队列的抽象,一个“消费者”线程Emitter从中取出已完成的计算结果,并提交给下游算子,而异步请求则充当了队列“生产者”的角色;
如图所示,AsyncWaitOperator主要由两部分组成:StreamElementQueue和Emitter。
StreamElementQueue是一个Promise队列,所谓Promise是一种异步抽象表示将来会有一个值,这个队列是未完成的Promise队列,也就是进行中的请求队列。Emitter是一个单独的线程,负责发送消息(收到的异步回复)给下游。
图中E5表示进入该算子的第五个元素(”Element-5”),在执行过程中首先会将其包装成一个“Promise” P5,然后将P5放入队列。最后调用AsyncFunction的asyncInvoke方法,该方法会向外部服务发起一个异步的请求,并注册回调。
该回调会在异步请求成功返回时调用AsyncCollector.collect方法将返回的结果交给框架处理。
实际上AsyncCollector是一个Promise,也就是 P5,在调用collect的时候会标记Promise为完成状态,并通知Emitter线程有完成的消息可以发送了。Emitter就会从队列中拉取完成的Promise,并从Promise中取出消息发送给下游。
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, OperatorActions {
/** Queue to store the currently in-flight stream elements into. */
private transient StreamElementQueue queue; // 存储带有异步返回值的请求队列
/** Pending stream element which could not yet added to the queue. */
private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;
private transient ExecutorService executor;
/** Emitter for the completed stream element queue entries. */
private transient Emitter<OUT> emitter; // 异步返回后的消费线程
/** Thread running the emitter. */
private transient Thread emitterThread;
@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
this.checkpointingLock = getContainingTask().getCheckpointLock();
this.inStreamElementSerializer = new StreamElementSerializer<>(getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
// create the operators executor for the complete operations of the queue entries
this.executor = Executors.newSingleThreadExecutor();
// 根据不同的数据输出模式 有序、无序;选择构建不同的StreamElementQueue queue
switch (outputMode) {
case ORDERED:
queue = new OrderedStreamElementQueue(
capacity,
executor,
this);
break;
case UNORDERED:
queue = new UnorderedStreamElementQueue(
capacity,
executor,
this);
break;
default:
throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
}
}
@Override
public void open() throws Exception {
super.open();
// create the emitter
this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
// start the emitter thread
// 构建 消费者线程 emitter Thread
this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
emitterThread.setDaemon(true);
emitterThread.start();
// .........
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
// 注册一个定时器,在超时时调用 timeout 方法
if (timeout > 0L) {
// register a timeout for this AsyncStreamRecordBufferEntry
long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
timeoutTimestamp,
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
}
});
// Cancel the timer once we've completed the stream record buffer entry. This will remove
// the register trigger task
streamRecordBufferEntry.onComplete(
(StreamElementQueueEntry<Collection<OUT>> value) -> {
timerFuture.cancel(true);
},
executor);
}
// 加入队列
addAsyncBufferEntry(streamRecordBufferEntry);
// 发送异步请求
userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
}
//尝试将待完成的请求加入队列,如果队列已满(到达异步请求的上限),会阻塞
private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
assert(Thread.holdsLock(checkpointingLock));
pendingStreamElementQueueEntry = streamElementQueueEntry;
while (!queue.tryPut(streamElementQueueEntry)) { // 将该请求加入队列;如果队列已满(到达异步请求的上限),会阻塞
// we wait for the emitter to notify us if the queue has space left again
checkpointingLock.wait();
}
pendingStreamElementQueueEntry = null;
}
}
public class Emitter<OUT> implements Runnable {
@Override
public void run() {
try {
while (running) {
LOG.debug("Wait for next completed async stream element result.");
// 从队列阻塞地获取元素,之后再向下游传递
AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();
output(streamElementEntry);
}
} catch (InterruptedException e) {
// .........
}
}
}
在“有序”模式下,所有异步请求的结果必须按照消息的到达顺序提交到下游算子。在这种模式下,StreamElementQueue的具体是实现是OrderedStreamElementQueue。
OrderedStreamElementQueue的底层是一个有界的队列,异步请求的计算结果按顺序加入到队列中,只有队列头部的异步请求完成后才可以从队列中获取计算结果。
有序模式比较简单,使用一个队列就能实现。所有新进入该算子的元素(包括watermark),都会包装成Promise并按到达顺序放入该队列。
如下图所示,尽管P4的结果先返回,但并不会发送,只有P1(队首)的结果返回了才会触发Emitter拉取队首元素进行发送。如下图所示:
public class OrderedStreamElementQueue implements StreamElementQueue {
/** Capacity of this queue. */
private final int capacity;
/** Queue for the inserted StreamElementQueueEntries. */
private final ArrayDeque<StreamElementQueueEntry<?>> queue;
@Override
public AsyncResult peekBlockingly() throws InterruptedException { // 从队列中阻塞地获取已异步完成的元素
lock.lockInterruptibly();
try {
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}
// 只有队列头部的请求完成后才解除阻塞状态
LOG.debug("Peeked head element from ordered stream element queue with filling degree " + "({}/{}).", queue.size(), capacity);
return queue.peek();
} finally {
lock.unlock();
}
}
@Override
public AsyncResult poll() throws InterruptedException {
lock.lockInterruptibly();
try {
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}
notFull.signalAll();
LOG.debug("Polled head element from ordered stream element queue. New filling degree " + "({}/{}).", queue.size() - 1, capacity);
return queue.poll();
} finally {
lock.unlock();
}
}
@Override
public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly(); // 将该请求加入队列;如果队列已满(到达异步请求的上限),返回false,其外部会阻塞
try {
if (queue.size() < capacity) { // 未达容量上限
addEntry(streamElementQueueEntry);
LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", queue.size(), capacity);
return true;
} else {
LOG.debug("Failed to put element into ordered stream element queue because it " + "was full ({}/{}).", queue.size(), capacity);
return false;
}
} finally {
lock.unlock();
}
}
}
在“无序”模式下,异步计算结果的提交不是由消息到达的顺序确定的,而是取决于异步请求的完成顺序。
当然,在使用“事件时间”的情况下,要保证watermark语义的正确性。
在使用“处理时间”的情况下,由于不存在Watermark,因此可以看作一种特殊的情况。
在UnorderedStreamElementQueue中巧妙地实现了这两种情况。
ProcessingTime无序也比较简单,因为没有watermark,不需要协调watermark与消息的顺序性,所以使用两个队列就能实现,一个uncompletedQueue、一个completedQueue。所有新进入该算子的元素,同样的包装成Promise并放入uncompletedQueue队列,当uncompletedQueue队列中任意的Promise返回了数据,则将该Promise移到completedQueue队列中,并通知Emitter消费。如下图所示:
EventTime无序类似于有序与ProcessingTime无序的结合体。因为有watermark,需要协调watermark与消息之间的顺序性,所以uncompletedQueue中存放的元素从原先的Promise变成了Promise集合。
如果进入算子的是消息元素,则会包装成Promise放入队尾的集合中。
如果进入算子的是watermark,也会包装成Promise并放到一个独立的集合中,再将该集合加入到uncompletedQueue队尾,最后再创建一个空集合加到uncompletedQueue队尾。
这样,watermark就成了消息顺序的边界。
只有处在队首的集合中的Promise返回了数据,才能将该Promise移到completedQueue队列中,由Emitter消费发往下游。
只有队首集合空了,才能处理第二个集合。这样就保证了当且仅当某个watermark之前所有的消息都已经被发送了,该watermark才能被发送。
过程如下图所示:
public class UnorderedStreamElementQueue implements StreamElementQueue {
/** Queue of uncompleted stream element queue entries segmented by watermarks. */
private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;
/** Queue of completed stream element queue entries. */
private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;
/** First (chronologically oldest) uncompleted set of stream element queue entries. */
private Set<StreamElementQueueEntry<?>> firstSet;
// Last (chronologically youngest) uncompleted set of stream element queue entries. New
// stream element queue entries are inserted into this set.
private Set<StreamElementQueueEntry<?>> lastSet;
@Override
public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
if (numberEntries < capacity) {
addEntry(streamElementQueueEntry);
LOG.debug("Put element into unordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity);
return true;
} else {
LOG.debug("Failed to put element into unordered stream element queue because it " + "was full ({}/{}).", numberEntries, capacity);
return false;
}
} finally {
lock.unlock();
}
}
private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
assert(lock.isHeldByCurrentThread());
if (streamElementQueueEntry.isWatermark()) {
// 如果是watermark,就要构造一个只包含这个watermark的set加入到uncompletedQueue队列中
lastSet = new HashSet<>(capacity);
if (firstSet.isEmpty()) {
firstSet.add(streamElementQueueEntry);
} else {
Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
watermarkSet.add(streamElementQueueEntry);
uncompletedQueue.offer(watermarkSet);
}
uncompletedQueue.offer(lastSet);
} else {
lastSet.add(streamElementQueueEntry); // 正常记录,加入lastSet中
}
streamElementQueueEntry.onComplete( // 设置异步请求完成后的回调
(StreamElementQueueEntry<T> value) -> {
try {
onCompleteHandler(value);
} catch (InterruptedException e) {
// ......
}
}, executor);
numberEntries++;
}
// 异步请求完成的回调
public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
// 如果完成的异步请求在firstSet中,那么就将firstSet中已完成的异步请求转移到completedQueue中
if (firstSet.remove(streamElementQueueEntry)) {
completedQueue.offer(streamElementQueueEntry);
while (firstSet.isEmpty() && firstSet != lastSet) {
// 如果firset中所有的异步请求都完成了,那么就从uncompletedQueue获取下一个集合作为firstSet
firstSet = uncompletedQueue.poll();
Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();
while (it.hasNext()) {
StreamElementQueueEntry<?> bufferEntry = it.next();
if (bufferEntry.isDone()) {
completedQueue.offer(bufferEntry);
it.remove();
}
}
}
LOG.debug("Signal unordered stream element queue has completed entries.");
hasCompletedEntries.signalAll();
}
} finally {
lock.unlock();
}
}
@Override
public AsyncResult poll() throws InterruptedException {
lock.lockInterruptibly();
try {
// 等待completedQueue中的元素
while (completedQueue.isEmpty()) {
hasCompletedEntries.await();
}
numberEntries--;
notFull.signalAll();
LOG.debug("Polled element from unordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity);
return completedQueue.poll();
} finally {
lock.unlock();
}
}
}
在异步调用模式下,可能会同时有很多个请求正在处理中。因而在进行快照的时候,需要将异步调用尚未完成,以及结果尚未提交给下游的消息加入到状态中。在恢复的时候,从状态中取出这些消息,再重新处理一遍。为了保证exactly-once特性,对于异步调用已经完成,且结果已经由emitter提交给下游的消息就无需保存在快照中。
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, OperatorActions {
/** Recovered input stream elements. */
private transient ListState<StreamElement> recoveredStreamElements;
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
recoveredStreamElements = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
}
@Override
public void open() throws Exception {
super.open();
// create the emitter
// 创建emitter消费线程
// process stream elements from state, since the Emit thread will start as soon as all
// elements from previous state are in the StreamElementQueue, we have to make sure that the
// order to open all operators in the operator chain proceeds from the tail operator to the
// head operator.
// 状态恢复的时候,从状态中取出所有未完成的消息,重新处理一遍
if (recoveredStreamElements != null) {
for (StreamElement element : recoveredStreamElements.get()) {
if (element.isRecord()) {
processElement(element.<IN>asRecord());
}
else if (element.isWatermark()) {
processWatermark(element.asWatermark());
}
else if (element.isLatencyMarker()) {
processLatencyMarker(element.asLatencyMarker());
}
else {
throw new IllegalStateException("Unknown record type " + element.getClass() + " encountered while opening the operator.");
}
}
recoveredStreamElements = null;
}
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
// 先清除状态
ListState<StreamElement> partitionableState =
getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();
// 将所有未完成处理请求对应的消息加入状态中
Collection<StreamElementQueueEntry<?>> values = queue.values();
try {
for (StreamElementQueueEntry<?> value : values) {
partitionableState.add(value.getStreamElement());
}
// add the pending stream element queue entry if the stream element queue is currently full
if (pendingStreamElementQueueEntry != null) {
partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
}
} catch (Exception e) {
partitionableState.clear();
throw new Exception("Could not add stream element queue entries to operator state " + "backend of operator " + getOperatorName() + '.', e);
}
}
}