首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >ReplyingKafkaTemplate源码分析

ReplyingKafkaTemplate源码分析

作者头像
johnhuster的分享
发布2023-10-16 19:57:47
发布2023-10-16 19:57:47
28000
代码可运行
举报
文章被收录于专栏:johnhusterjohnhuster
运行总次数:0
代码可运行
代码语言:javascript
代码运行次数:0
运行
复制
ReplyingKafkaTemplate是spring-kafka组件提供的一个用于实现请求响应模式的类,基础介绍可以参考文章https://blog.csdn.net/john1337/article/details/131363690,这里不再赘述这些细节,下面看下ReplyingKafkaTemplate是如何实现请求响应模式的。

代码语言:javascript
代码运行次数:0
运行
复制
	@Override
	public void afterPropertiesSet() {
		if (!this.schedulerSet && !this.schedulerInitialized) {
            // 初始化ThreadPoolTaskScheduler
			((ThreadPoolTaskScheduler) this.scheduler).initialize();
			this.schedulerInitialized = true;
		}
	}

	@Override
	public synchronized void start() {
		if (!this.running) {
			try {
				afterPropertiesSet();
			}
			catch (Exception e) {
				throw new KafkaException("Failed to initialize", e);
			}
            // 启动GenericMessageListenerContainer,用于后续接收响应数据
			this.replyContainer.start();
			this.running = true;
		}
	}

上面为该类初始化部分,下面看下数据发送接口:

代码语言:javascript
代码运行次数:0
运行
复制
	public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @Nullable Duration replyTimeout) {
		Assert.state(this.running, "Template has not been start()ed"); // NOSONAR (sync)
		// correlationId是本次消息的唯一标识,实际上就是一个random 的uuid组成,这部分代码就不再列出,感兴趣的可以查看ReplyingKafkaTemplate类defaultCorrelationIdStrategy方法
        CorrelationKey correlationId = this.correlationStrategy.apply(record);
		Assert.notNull(correlationId, "the created 'correlationId' cannot be null");
		Headers headers = record.headers();
		boolean hasReplyTopic = headers.lastHeader(KafkaHeaders.REPLY_TOPIC) != null;
		if (!hasReplyTopic && this.replyTopic != null) {
			headers.add(new RecordHeader(this.replyTopicHeaderName, this.replyTopic));
			if (this.replyPartition != null) {
				headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
			}
		}
		headers.add(new RecordHeader(this.correlationHeaderName, correlationId.getCorrelationId()));
		this.logger.debug(() -> "Sending: " + record + WITH_CORRELATION_ID + correlationId);
        // 创建RequestReplyFuture,并存入futures中
		RequestReplyFuture<K, V, R> future = new RequestReplyFuture<>();
		this.futures.put(correlationId, future);
		try {
			future.setSendFuture(send(record));
		}
		catch (Exception e) {
			this.futures.remove(correlationId);
			throw new KafkaException("Send failed", e);
		}
        // 启动定时检测,指定时间结束后及时没收到反馈,也结束等待并返回KafkaReplyTimeoutException超时异常
		scheduleTimeout(record, correlationId, replyTimeout == null ? this.defaultReplyTimeout : replyTimeout);
		return future;
	}
代码语言:javascript
代码运行次数:0
运行
复制
	private void scheduleTimeout(ProducerRecord<K, V> record, CorrelationKey correlationId, Duration replyTimeout) {
        // 超时检测部分代码
		this.scheduler.schedule(() -> {
			RequestReplyFuture<K, V, R> removed = this.futures.remove(correlationId);
			if (removed != null) {
				this.logger.warn(() -> "Reply timed out for: " + record + WITH_CORRELATION_ID + correlationId);
				if (!handleTimeout(correlationId, removed)) {
					removed.setException(new KafkaReplyTimeoutException("Reply timed out"));
				}
			}
		}, Instant.now().plus(replyTimeout));
	}

上面是超时检测部分超时情况下反馈,下面看下正常接收到响应时代码:

代码语言:javascript
代码运行次数:0
运行
复制
	public void onMessage(List<ConsumerRecord<K, R>> data) {
		data.forEach(record -> {
            // 获取kafka_correlationId请求头
			Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
			CorrelationKey correlationId = null;
			if (correlationHeader != null) {
				correlationId = new CorrelationKey(correlationHeader.value());
			}
			if (correlationId == null) {
				this.logger.error(() -> "No correlationId found in reply: " + record
						+ " - to use request/reply semantics, the responding server must return the correlation id "
						+ " in the '" + this.correlationHeaderName + "' header");
			}
			else {
                // 收到对应反馈数据,这里需要判断是否该数据在来到之前已被超时检测删除
				RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
				CorrelationKey correlationKey = correlationId;
				if (future == null) {
                     // 数据已被超时检测机制删除,记录日志
					logLateArrival(record, correlationId);
				}
				else {
					this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey);
                     // 将反馈返回
					future.set(record);
				}
			}
		});
	}

示例代码可以参考:

https://cloud.tencent.com/developer/article/2343889

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-10-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档