本文主要研究一下storm trident batch的分流与聚合
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", spout)
.partitionBy(new Fields("user"))
.partitionAggregate(new Fields("user","score","batchId"),new OriginUserCountAggregator(),new Fields("result","aggBatchId"))
.parallelismHint(3)
.global()
.aggregate(new Fields("result","aggBatchId"),new AggAgg(),new Fields("agg"))
.each(new Fields("agg"),new PrintEachFunc(),new Fields())
;
log实例
23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt1, 1]
23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt2, 1]
23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt3, 1]
23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0
23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt2, 1, 1]
23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt2=1}}
23:22:00.722 [Thread-22-b-0-executor[7 7]] INFO com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0
23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0
23:22:00.723 [Thread-22-b-0-executor[7 7]] INFO com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt1, 1, 1]
23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt3, 1, 1]
23:22:00.723 [Thread-22-b-0-executor[7 7]] INFO com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt1=1}}
23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt3=1}}
23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - zero called
23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - init tuple:[{1={nickt2=1}}, 1:0]
23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt2=1}}
23:22:00.726 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - init tuple:[{1={nickt3=1}}, 1:0]
23:22:00.727 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - combine val1:{1={nickt2=1}},val2:{1={nickt3=1}}
23:22:00.728 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - init tuple:[{1={nickt1=1}}, 1:0]
23:22:00.728 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - combine val1:{1={nickt3=1, nickt2=1}},val2:{1={nickt1=1}}
23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO com.example.demo.trident.AggAgg - zero called
23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO com.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt3=1, nickt2=1, nickt1=1}}
23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO com.example.demo.trident.PrintEachFunc - null each tuple:[{1={nickt3=1, nickt2=1, nickt1=1}}]
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java
public void execute(Tuple tuple) {
if(TupleUtils.isTick(tuple)) {
long now = System.currentTimeMillis();
if(now - _lastRotate > _messageTimeoutMs) {
_batches.rotate();
_lastRotate = now;
}
return;
}
String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
if(batchGroup==null) {
// this is so we can do things like have simple DRPC that doesn't need to use batch processing
_coordCollector.setCurrBatch(null);
_bolt.execute(null, tuple);
_collector.ack(tuple);
return;
}
IBatchID id = (IBatchID) tuple.getValue(0);
//get transaction id
//if it already exists and attempt id is greater than the attempt there
TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
// if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
// System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
// + " (" + _batches.size() + ")" +
// "\ntuple: " + tuple +
// "\nwith tracked " + tracked +
// "\nwith id " + id +
// "\nwith group " + batchGroup
// + "\n");
//
// }
//System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
// this code here ensures that only one attempt is ever tracked for a batch, so when
// failures happen you don't get an explosion in memory usage in the tasks
if(tracked!=null) {
if(id.getAttemptId() > tracked.attemptId) {
_batches.remove(id.getId());
tracked = null;
} else if(id.getAttemptId() < tracked.attemptId) {
// no reason to try to execute a previous attempt than we've already seen
return;
}
}
if(tracked==null) {
tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
_batches.put(id.getId(), tracked);
}
_coordCollector.setCurrBatch(tracked);
//System.out.println("TRACKED: " + tracked + " " + tuple);
TupleType t = getTupleType(tuple, tracked);
if(t==TupleType.COMMIT) {
tracked.receivedCommit = true;
checkFinish(tracked, tuple, t);
} else if(t==TupleType.COORD) {
int count = tuple.getInteger(1);
tracked.reportedTasks++;
tracked.expectedTupleCount+=count;
checkFinish(tracked, tuple, t);
} else {
tracked.receivedTuples++;
boolean success = true;
try {
_bolt.execute(tracked.info, tuple);
if(tracked.condition.expectedTaskReports==0) {
success = finishBatch(tracked, tuple);
}
} catch(FailedException e) {
failBatch(tracked, e);
}
if(success) {
_collector.ack(tuple);
} else {
_collector.fail(tuple);
}
}
_coordCollector.setCurrBatch(null);
}
private void failBatch(TrackedBatch tracked, FailedException e) {
if(e!=null && e instanceof ReportedFailedException) {
_collector.reportError(e);
}
tracked.failed = true;
if(tracked.delayedAck!=null) {
_collector.fail(tracked.delayedAck);
tracked.delayedAck = null;
}
}
private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
if(tracked.failed) {
failBatch(tracked);
_collector.fail(tuple);
return;
}
CoordCondition cond = tracked.condition;
boolean delayed = tracked.delayedAck==null &&
(cond.commitStream!=null && type==TupleType.COMMIT
|| cond.commitStream==null);
if(delayed) {
tracked.delayedAck = tuple;
}
boolean failed = false;
if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
if(tracked.receivedTuples == tracked.expectedTupleCount) {
finishBatch(tracked, tuple);
} else {
//TODO: add logging that not all tuples were received
failBatch(tracked);
_collector.fail(tuple);
failed = true;
}
}
if(!delayed && !failed) {
_collector.ack(tuple);
}
}
实际是发射一个batch的指令
),在bolt.execute之后就立马finishBatch;而对于SubtopologyBolt,这里tracked.condition.expectedTaskReports不为0,需要等到最后的[id,count]指令再checkFinishstorm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.java
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector) {
_emitter = _spout.getEmitter(_txStateId, conf, context);
_collector = new AddIdCollector(_streamName, collector);
}
@Override
public void execute(BatchInfo info, Tuple input) {
// there won't be a BatchInfo for the success stream
TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
_activeBatches.remove(attempt.getTransactionId());
} else {
throw new FailedException("Received commit for different transaction attempt");
}
} else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
// valid to delete before what's been committed since
// those batches will never be accessed again
_activeBatches.headMap(attempt.getTransactionId()).clear();
_emitter.success(attempt);
} else {
_collector.setBatch(info.batchId);
_emitter.emitBatch(attempt, input.getValue(1), _collector);
_activeBatches.put(attempt.getTransactionId(), attempt);
}
}
@Override
public void finishBatch(BatchInfo batchInfo) {
}
@Override
public Object initBatchState(String batchGroup, Object batchId) {
return null;
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java
@Override
public Object initBatchState(String batchGroup, Object batchId) {
ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);
for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {
p.startBatch(ret);
}
return ret;
}
@Override
public void execute(BatchInfo batchInfo, Tuple tuple) {
String sourceStream = tuple.getSourceStreamId();
InitialReceiver ir = _roots.get(sourceStream);
if(ir==null) {
throw new RuntimeException("Received unexpected tuple " + tuple.toString());
}
ir.receive((ProcessorContext) batchInfo.state, tuple);
}
@Override
public void finishBatch(BatchInfo batchInfo) {
for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
p.finishBatch((ProcessorContext) batchInfo.state);
}
}
protected static class InitialReceiver {
List<TridentProcessor> _receivers = new ArrayList<>();
RootFactory _factory;
ProjectionFactory _project;
String _stream;
public InitialReceiver(String stream, Fields allFields) {
// TODO: don't want to project for non-batch bolts...???
// how to distinguish "batch" streams from non-batch streams?
_stream = stream;
_factory = new RootFactory(allFields);
List<String> projected = new ArrayList<>(allFields.toList());
projected.remove(0);
_project = new ProjectionFactory(_factory, new Fields(projected));
}
public void receive(ProcessorContext context, Tuple tuple) {
TridentTuple t = _project.create(_factory.create(tuple));
for(TridentProcessor r: _receivers) {
r.execute(context, _stream, t);
}
}
public void addReceiver(TridentProcessor p) {
_receivers.add(p);
}
public Factory getOutputFactory() {
return _project;
}
}
比如AggregateProcessor、EachProcessor
)的startBatch方法比如AggregateProcessor
)比如AggregateProcessor、EachProcessor
)的finishBatch方法storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@Override
public void startBatch(ProcessorContext processorContext) {
// initialize state for batch
processorContext.state[tridentContext.getStateIndex()] = new ArrayList<TridentTuple>();
}
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
// add tuple to the batch state
Object state = processorContext.state[tridentContext.getStateIndex()];
((List<TridentTuple>) state).add(projection.create(tuple));
}
@Override
public void finishBatch(ProcessorContext processorContext) {
Object batchId = processorContext.batchId;
Object batchTxnId = getBatchTxnId(batchId);
LOG.debug("Received finishBatch of : [{}] ", batchId);
// get all the tuples in a batch and add it to trident-window-manager
List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
tridentWindowManager.addTuplesBatch(batchId, tuples);
List<Integer> pendingTriggerIds = null;
List<String> triggerKeys = new ArrayList<>();
Iterable<Object> triggerValues = null;
if (retriedAttempt(batchId)) {
pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
if (pendingTriggerIds != null) {
for (Integer pendingTriggerId : pendingTriggerIds) {
triggerKeys.add(triggerKey(pendingTriggerId));
}
triggerValues = windowStore.get(triggerKeys);
}
}
// if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
if(triggerValues == null) {
pendingTriggerIds = new ArrayList<>();
Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
try {
Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
List<Object> values = new ArrayList<>();
StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
while (pendingTriggersIter.hasNext()) {
triggerResult = pendingTriggersIter.next();
for (List<Object> aggregatedResult : triggerResult.result) {
String triggerKey = triggerKey(triggerResult.id);
triggerKeys.add(triggerKey);
values.add(aggregatedResult);
pendingTriggerIds.add(triggerResult.id);
}
pendingTriggersIter.remove();
}
triggerValues = values;
} finally {
// store inprocess triggers of a batch in store for batch retries for any failures
if (!pendingTriggerIds.isEmpty()) {
windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds);
}
}
}
collector.setContext(processorContext);
int i = 0;
for (Object resultValue : triggerValues) {
collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
}
collector.setContext(null);
}
比如ProjectedProcessor、PartitionPersistProcessor
),PartitionPersistProcessor就是将数据存到state中,而ProjectedProcessor则根据window的outputFields提取字段,然后将数据传递给下游的各种processor,比如EachProcessorTridentBoltExecutor会在tuple处理完之后自动帮你进行ack
),如果整体处理时间过长,会导致整个topology的tuple处理超时,触发spout的fail操作,这个时候就会重新触发该batchId,如果spout是transactional的,那么batchId对应的tuples在重新触发时不变WindowTridentProcessor每次在startBatch的时候都会重置state
)中,在finishBatch的时候,将数据拷贝到windowStore以及windowManager的ConcurrentLinkedQueue,之后等待window的trigger触发,计算出窗口数据,然后放到pendingTriggers中,而在bolt finishBatch的时候是从pendingTriggers移除窗口数据,然后交给FreshCollector然后给到下游的processor处理,而下游的processor的startBatch及finishBatch时跟随原始的spout的节奏来的,而非window来触发topology.trident.batch.emit.interval.millis,在defaults.yaml默认为500
)参数,而窗口的interval通常一般比默认的batch interval要大,这个样子window就会聚合多个batch的数据;同时由于前面finishBatch的时候,才把数据添加到windowManager的ConcurrentLinkedQueue,因而这个时候的pendingTriggers还没有数据,因而通常前面几次finishBatch的时候从窗口获取的数据为空,因而后续的processor也没有数据处理,要注意判空防止出现空指针tuple按emit的顺序来,最后一个是[id,count],它就相当于结束batch的指令,用于检测及触发完成batch操作
),然后将新batch的数据发送给下游,新的batch发送完的时候发送[id,cout],依次在下游bolt进行batch操作;global操作将数据分发到同一个partition/task;batchGlobal在parallelism为1的时候效果跟global一样,在parallelism大于1时,就按batchId将数据分发到不同的partition/task原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。