腾讯云
开发者社区
文档
建议反馈
控制台
登录/注册
首页
学习
活动
专区
圈层
工具
MCP广场
文章/答案/技术大牛
搜索
搜索
关闭
发布
文章
问答
(9999+)
视频
沙龙
1
回答
通过
代码
/
api
重置
Kafka
streams
应用程序
、
、
我在想用
Kafka
Streams
执行这种操作的最佳方法是什么。我有一个
Kafka
流和一个KGlobalTable,比如说products (1.000.000 msg)和categoriesLogicBlobTable (10 msg)。我正在考虑使用
kafka
.tools.StreamsResetter逻辑,并以一种停止kafkaStream、运行
重置
并再次启动流的方式挂接我的
代码
。第二种选择是没有
kafka
流,而是只有两个消费者和一个生产者。这样,我就可以使用co
浏览 22
提问于2020-07-15
得票数 0
2
回答
将消费者偏移量从
Kafka
流
重置
到开头
、
、
、
、
我正在使用
Kafka
streams
,并希望将一些从Java到开头的消费者偏移量
重置
。KafkaConsumer.seekToBeginning(...)听起来是正确的选择,但我使用的是
Kafka
Streams
:...我猜,根据我定义的具体流管道,这将在引擎盖下创建几个消费者。或者,是否有其他方法可以
通过</e
浏览 23
提问于2017-07-09
得票数 2
回答已采纳
1
回答
如何移除/清除
Kafka
Streams
中的状态存储?
、
我在
kafka
-
streams
的末尾有一个自定义的Transformer实现,并绑定了一个持久的changelog KeyValueStore。但是,
应用程序
本身只是一个原型,所以我不介意完全清除存储。我可以重命名
kafka
.application.id和state-store-name,但这是一个临时的解决办法(相应的数据/主题不会被删除)。 如何将其完全清除?
浏览 0
提问于2017-10-11
得票数 7
回答已采纳
1
回答
卡夫卡流与消费者群体怪异行为
、
、
首先,是
kafka
-consumer-group.sh脚本的输出。的
API
时,某些当前的偏移量(第3列)和滞后(第4列)显示为'-‘,以区分它们实际上已经被捕获了?(
通过
golang
API
查询) 4 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-我们有一个流进程工作,即在随机时间(主要是在重新启动期间),
重置
到特定主题中可用的最早偏移量。在整个
代码
中,没有“<
浏览 1
提问于2017-10-02
得票数 0
回答已采纳
1
回答
Kafka
Streams
标点符号时间戳在上下文时间戳之前
、
当我记录
通过
transform函数传入的消息时,来自context.timestamp()的流时间根据使用时间戳提取器导出的数据显示为预期的正常日期。我们现在已经停止了这些上游,并重新启动了
Kafka
Streams
。当
streams
启动时,标点符号在受影响的任务启动时运行,但显示的时间戳为2036 -即使消息
通过
转换到达,context.timestamp仍显示有效日期。
浏览 15
提问于2021-11-01
得票数 1
1
回答
如何更改
Kafka
Topology的消费者偏移量?
、
、
现在,我正在做一个使用
Kafka
Streams
来处理消息的项目。我们的消息由两个标识符组成,一个是用户的标识符,第二个是用户所属的消息列表的标识符。这个
应用程序
的故事是,一个客户端想要
通过
我的
应用程序
向他们的用户列表发送推送消息。因此,客户端在向用户发送推送消息之前从外部源加载用户列表,然后开始发送。我想停止发送推送消息,而不是
通过
kafka
的流
api
消费所有消息。我在谷歌上搜索了最新的流源偏移量的变化,但找不到合适的方法来完成它。 我如何才能做到这
浏览 0
提问于2017-12-15
得票数 1
1
回答
Kafka
Streams
应用程序
重置
工具不起作用,无法将主题偏移量
重置
为0
、
我引用了,并尝试使用
重置
kafka
streams
应用程序
。但即使我做了所有的前提条件和必要的步骤,当我检查
重置
主题offset by /opt/cloudera/parcels/
KAFKA
/lib/
kafka
/bin/
kafka
-consumer-groups.sh
浏览 0
提问于2018-06-21
得票数 0
1
回答
kafka
崩溃后,偏移量丢失
、
、
、
、
我们的
kafka
系统崩溃是因为没有可用的磁盘空间。使用者是使用
Kafka
Streams
API
的Spring引导
应用程序
。现在,每个消费者
应用程序
都显示以下错误:该异常恰好发生在
kafka
服务器重启之后。如果我们重新启动
应用程序
,服务将在偏移量0处启动,以重放消费主题中的所有消息。
浏览 30
提问于2018-12-18
得票数 3
1
回答
有没有办法从Java
API
中的特定偏移量开始消费
kafka
主题?
、
、
、
我使用的是
Kafka
Stream
API
。当我启动我的
应用程序
时,有时会有一个间隙,我想从一个特定的偏移量开始消费。最早或最晚的不是我想要的。
浏览 25
提问于2020-05-02
得票数 2
回答已采纳
1
回答
Kafka
Streams
API
如何从模式注册表获取正确的模式?
、
、
我试图理解
Kafka
Streams
API
是如何与Schema Registry一起工作的。我知道在设置
应用程序
时必须指定Schema Registry URL,但我无法理解如果不指定主题名称或ID,
应用程序
如何从注册表检索正确的模式。 它使用主题名称检索模式?
浏览 2
提问于2019-11-25
得票数 2
1
回答
KSQL查询给我的请求增加了太多的延迟
、
我有一个将(X,Y)坐标保存到SQL表的系统。然后,我有一个端点,当被调用时,它返回(X,Y)坐标。 但是,我的系统需要30分钟来处理(X,Y)坐标并将其存储到SQL表中。从这个意义上说,我使用KSQL来更快地获取数据。 我已经在我提到的后端的端点中添加了对KSQL的调用。问题是这个调用给我的请求增加了6秒的额外时间。 我的端点包含一个查询,如下所示 SELECT feature_a,feature_b FROM ksql_table; 之前的两个流已经对ksql_table进行了预处理。在我的理解中,这个查询应该是非常简单和容易计算的。但这需要6秒的时间来处理。
浏览 29
提问于2019-02-04
得票数 1
回答已采纳
1
回答
我如何将卡夫卡状态
重置
为“宇宙的开始”?
我仍然在开发我用描述的
Kafka
应用程序
。在那篇帖子里,我问为什么似乎没有把卡夫卡的状态
重置
为我现在遇到了这个问题的一个变体: 我的
应用程序
包括一个生产者程序,它将数据推送到
Kafka
流,以及一个消费者程序,它将数据分组,聚合组,然后将生成的KTable转换回流,然后我打印出来。然而,我观察到,每次我运行程序时,产生的聚合值都会越来越大,
浏览 2
提问于2018-04-03
得票数 0
2
回答
在
Kafka
Streams
应用程序
中启动新线程(使用编程方式)是否可取?
、
、
我们正在使用低级处理器
API
开发一个
Kafka
Streams
应用程序
。 根据
Kafka
上的文档,所有的线程和并行性都是由Stream线程和流任务处理的。使用主题上的分区,并行性也是可扩展的。当前
代码
如下所示: public class Processor implements Processor<K, V> { public void process(String这意味着使用
Kafka
Streams
A
浏览 15
提问于2019-05-29
得票数 2
1
回答
卡夫卡流与JoinWindow合作进行数据重放
、
我有两个数据流,我希望能够加入他们的窗口,一个月,比方说。当我有一个实时数据时,使用、KStream、和join,一切都很有趣和非常容易。我做了这样的事; builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic1()); KStream<Str
浏览 4
提问于2017-01-23
得票数 4
回答已采纳
1
回答
Kafka
流
重置
问题
、
、
我一直试图建立一个卡夫卡流
应用程序
,用于星火。我有一个静态数据集来测试。在运行完我的
代码
之后,
Kafka
设置当前偏移量,使我无法在第二次运行时重新处理数据。运行
kafka
-
streams
-application-reset应该会
重置
偏移量。但是,重新运行我的
代码
会导致一个空的GlobalKTable。org.apache.
kafka
.common.utils.Bytes import org.apache.
kafka
.
stre
浏览 1
提问于2018-07-25
得票数 1
1
回答
如何正确地外部化属性文件中的spring-boot
kafka
-
streams
配置?
、
、
我试图将目前用Java
代码
编写的spring
应用程序
的配置具体化。是将ProducerConfig和ConsumerConfig值放入spring.
kafka
.
streams
.properties,还是
通过
spring.
kafka
.producer和spring.
kafka
.consumer到目前为止,看来我应该把我所有的配置都放到KafkaStreamsConfiguration类型的bean中,以便配置我的
kafka
-
streams</em
浏览 1
提问于2019-06-13
得票数 3
回答已采纳
2
回答
消费群
Kafka
流偏移量
重置
为零
、
我已经写了
Kafka
流媒体
应用程序
,只是根据一些条件过滤行,并将其加载到MongoDB。如何在不更改消费者组id的情况下实现此场景。我使用的是
Kafka
0.10版本的>> 非常感谢Pari
浏览 17
提问于2016-07-20
得票数 6
回答已采纳
1
回答
当您使用
应用程序
重置
工具时,
Kafka
状态存储会发生什么?
、
当您运行
Kafka
streams
应用程序
重置
工具以将
应用程序
重置
为特定的时间戳(例如T-n)时,状态存储会发生什么?文档内容为:“内部主题:删除内部主题(这会自动删除任何已提交的偏移量)”(内部主题在执行状态存储的changelog主题时由
Kafka
Streams
应用程序
在内部使用)是否有可能在<
浏览 13
提问于2020-02-25
得票数 1
回答已采纳
2
回答
Kafka
-如何将Rest服务消息转换为
kafka
主题?
、
、
、
我是
kafka
社区的新手,我面临着一个具有挑战性的问题。我有两个
应用程序
,它们
通过
Rest webservice相互通信,主体是一个json消息。我如何使用
kafka
作为这两个
应用程序
之间的中间件,而对
应用程序
的影响很小或为零? 这是我的原样场景: ? 我的未来场景: ? 我考虑过使用STM或一些拦截器来转换头部和正文。
浏览 35
提问于2019-01-25
得票数 2
回答已采纳
2
回答
Apache Flink State Store与
Kafka
Streams
、
、
、
据我所知,处理
Kafka
流的状态在内存、磁盘或
Kafka
主题中都是本地的,因为所有的输入数据都来自一个分区,其中所有的消息都是由一个定义的值键控的。如果是这样的话,您有另一个
Streams
实例来计算结果。如图所示:Flink到底在哪里存储它的状态?Flink是否也可以在本地存储状态,或者总是将它们发布到所有实例(任务)?是否可以将Flink配置为将状态存储在
Kafka
代理中?
浏览 0
提问于2019-02-10
得票数 3
点击加载更多
相关
资讯
流式处理:使用 Apache Kafka的Streams API 实现 Rabobank 的实时财务告警
Kafka实战(五)-核心API及适用场景全面解析
初探Kafka Streams
Kafka 3.0.0新功能get
Heron:来自Twitter的新一代流处理引擎应用篇
热门
标签
更多标签
云服务器
ICP备案
对象存储
云点播
实时音视频
活动推荐
运营活动
广告
关闭
领券