腾讯云
开发者社区
文档
建议反馈
控制台
登录/注册
首页
学习
活动
专区
工具
TVP
最新优惠活动
文章/答案/技术大牛
搜索
搜索
关闭
发布
精选内容/技术社群/优惠产品,
尽在小程序
立即前往
文章
问答
(9999+)
视频
沙龙
1
回答
在
Kstream
Consumer
Processor
中
使用
Avro
创建
状态
存储
、
、
它读取主题外的
avro
消息,并构造一个聚合数据的
状态
存储
,该数据也是
avro
类型的。@Bean Serde<OutputEvent> serdeOutEvent}, Materialized.with(new Serdes.StringSerde(), serdeOutEvent).toStream();
浏览 23
提问于2021-02-27
得票数 1
回答已采纳
2
回答
春云流汇合
KStream
Avro
消费
、
、
、
、
我正在尝试
使用
来自kafka主题的合流
avro
消息,作为带有spring 2.0的
Kstream
。 我能够将消息作为MessageChannel来
使用
,而不是以
KStream
的形式
使用
。7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3“org.apache.kafka.streams.errors.StreamsException:流
中
的异常
在
org.apache.kafka.streams.<em
浏览 2
提问于2019-11-25
得票数 1
回答已采纳
1
回答
使用
Spring Cloud Stream上的自定义serde序列化聚合
状态
存储
时出错
、
、
我正在尝试
使用
Spring Cloud Stream
创建
一个简单的功能bean,它处理来自
KStream
和GlobalKTable的消息,连接它们,聚合它们,并将结果输出到一个新的流
中
,但我
在
正确配置它所需的-0.
consumer
.value-serde: io.confluent.kafka.streams.serdes.
avro
.GenericAvroSerde spring.cloud.stream.kafka.streams.bindings.joinAndAg
浏览 82
提问于2020-05-20
得票数 1
回答已采纳
2
回答
接收器主题在拓扑
中
是强制性的吗?
、
在
我的应用程序
中
,我需要Kafka Streams特性来连接、转换和聚合到
状态
存储
中
。但是,我不需要接收器主题,因为我的应用程序是事件链
中
的最后一个链接。
浏览 11
提问于2022-06-07
得票数 0
1
回答
从Spring Cloud Streams Kafka Stream应用程序
中
的处理器写入主题
、
、
、
我正在
使用
处理器API
在
状态
存储
中
执行一些低级处理。重点是,我还需要在
存储
到
存储
中
后写入主题。如何在Spring Cloud Streams Kafka应用
中
做到这一点?@Bean }
浏览 40
提问于2020-05-02
得票数 0
回答已采纳
1
回答
Kafka流任务分配
、
、
在
我的当前场景
中
,所有主题都只有一个分区。 当我运行同一个应用程序的新实例(
使用
相同的APPLICATION_ID),处理不同的主题时,流客户端不会在这个新应用程序
中
创建
新任务。第一个实例继续处理任务0_0
中
的第一个主题,第二个实例
在
没有分配分区的情况下等待。我知道我只
使用
一个分区的主题,但在这种情况下,如果我有两个实例和两个具有一个分区的主题来处理两个分区,那么为什么不能在每个实例
中
同时处理两个单独分区的主题呢?我怀疑这与Strea
浏览 2
提问于2019-11-20
得票数 4
回答已采纳
1
回答
卡夫卡流:"TopicAuthorizationException:未授权访问内部
状态
存储
的主题“
、
Java: OpenJdk 11 Kafka: 2.2.0Kafka流库: 2.3.0 我正在尝试将我的Kafka应用程序部署到停靠器容器
中
,但是它在尝试
使用
TopicAuthorizationException
创建
内部
状态
存储
时失败了。本地和服务器之间的主要区别是,它连接到部署了Kafka的服务器,并
使用
通常的Kerberos auth进行身份验证。我无法理解身份验证与本地
存储
之间的联系。40b2-a314-4b20f32918f7-StreamThread-1流线程
浏览 2
提问于2019-10-09
得票数 8
回答已采纳
0
回答
消息密钥
在
Kafka流
中
的长度
、
、
我尝试
使用
Long as类型的消息键,但我得到streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
KStream</em
浏览 1
提问于2017-12-05
得票数 0
回答已采纳
1
回答
如何注册无
状态
处理器(这似乎也需要StateStore )?
、
我正在构建一个拓扑,并希望
使用
将一些中间值写入数据库。此步骤不改变数据的性质,并且完全无
状态
。当应用程序启动时,未能添加上述StateStore将导致此错误: 线程"main“org.apache.kafka.streams.errors.TopologyBuilderException为什么处理器必须有
状态</e
浏览 5
提问于2016-08-22
得票数 6
回答已采纳
1
回答
连续的流-流内部连接会产生错误的结果:流之间的
KStream
真正在内部做什么?
、
、
、
、
问题设置 我有一个节点流和一个表示图的连续更新的边缘流,我希望
使用
多个串联连接来构建由节点和边缘组成的模式。让我们假设我想匹配这样的模式:(node1) --edge1-> (node2)。
在
特定类型的节点和边缘上进行过滤并不重要。将这些部分连接在一起的想法是
使用
前面的Valuejoiners执行两个连续的连接。我希望您关注两个ValueJoiners执行的第一个操作:为了构建模式,我只需
在
列表的末尾添加节点和边,这是模式的
Avro
模式的一部分。下面是生成节点和边并在相应主题中发布它们的通用循环。特
浏览 3
提问于2021-01-21
得票数 1
回答已采纳
1
回答
Kafka Streams
使用
的RocksDB文件名
在
/tmp/streams-my-application-id下,我找到了RocksDB
使用
的文件。我的目的是通过du -h检查文件大小。 当看到文件名时,我对文件名的含义很好奇。前缀0和1是否表示
使用
的主题数量,而后者是
使用
的分区? 这个KafkaStreams应用程序简单地
使用
KStream
-KTable连接两个主题,其中一个主题是重新分区并还原为KTable。./1_2/rocksdb/
KSTREAM
-REDUCE
浏览 23
提问于2019-06-14
得票数 1
回答已采纳
2
回答
卡夫卡州的商店能跨流共享吗?
、
我们有这样一种场景,即需要在另一个
kstream
中
访问具有来自一个
kstream
的值的statestore,有任何方法可以实现这一点吗?
浏览 8
提问于2022-05-25
得票数 0
1
回答
如何在聚合和预处理器
中
重用
状态
存储
?
、
我想将消息聚合成一个
状态
,但我也希望
在
预处理步骤中
使用
该
状态
,这样也可以更新其他
状态
。例如,查看此消息是否更改了
状态
的某些部分,如果是的话,更新跟踪
状态
特定部分以及
状态
本身的主题。我能想象的最好的方法是
使用
能够访问
状态
存储
的转换器,但是
在
聚合
状态
之前就这样做,这样我就可以
在
状态
更新之前看到
状态
的值。(
在
浏览 0
提问于2018-05-24
得票数 3
1
回答
春季云kafka流模式注册中心
、
、
我试图
使用
函数编程(和spring云流)从输入主题转换输入
AVRO
消息,并在输出主题上发布新消息。这是我的转换函数:public Function<
KStream
<String, Data>,
KStream
<String, Double>> evenNumberSquareProcessor() { return
kStream
->
kStream
.transform(() -> new CustomProcess
浏览 5
提问于2021-04-06
得票数 1
1
回答
同一个主题的多个StreamListeners与卡夫卡连接的Spring
、
、
我有Spring应用程序,我正在
使用
连接到Kafka。我试图为相同的kafka主题设置两个单独的流侦听器方法。@StreamListener("countries") public
KStream
<?, AggregatedCountry> process(
KStream
<Object, Country> input) { .
浏览 3
提问于2020-06-19
得票数 1
回答已采纳
1
回答
卡夫卡集合与具体化阿夫罗发球给NullPointerException
、
我正在尝试窗口一个数据流,对于每个窗口,我需要该窗口中的值列表,并为此
创建
了一个自定义的
avro
,其中包含一个字段records,它是一个Input列表。
KStream
<String, Input> windowedStream = timestampFilteredStream at org.apache.kafka.streams.
kstream
.internals.KStreamTra
浏览 0
提问于2019-04-16
得票数 0
回答已采纳
1
回答
流经常重新
创建
商店。
在
流应用程序
中
,我
使用
交互式查询和
状态
器,以便更快地扩展和
使用
来自主题的数据。| ProcessorTopology:anomaly-timeline-3 |
KSTREAM
-REDUCE-0000000009: ano
浏览 1
提问于2019-03-01
得票数 0
1
回答
是否支持具有相同接收器和源主题的Kafka流?
、
、
我有一个复杂的Kafka应用程序,其中两个流在同一个流
中
完全有
状态
: 它
使用
Execution主题作为源,增强消息并将其重新发布回相同的Execution主题。),并在当前Execution
中
更改WorkerTaskResult更改当前TaskRun的
状态
(主要是运行/成功/失败),以及也被发布回<code>D34<//code>队列(与Kafka流一起)<code(
使用
2
KStream
to <e
浏览 5
提问于2020-04-20
得票数 1
回答已采纳
2
回答
在
春云流中
使用
@Valid的pojo值
、
如何在以下卡夫卡消费者代码中
使用
@Valid启用验证?我
使用
的是Spring (Kafka实现),
在
实现之后,我
使用
的是功能模型。@Bean return messages -> messages.foreach
浏览 10
提问于2021-06-29
得票数 2
1
回答
在
标头中依赖模式引用的Serdes中
使用
Kafka流
、
、
因为架构标识符
存储
在
标头中。byte[] serialize(String topic, Headers headers, T data)
在
包装序列化器ValueAndTimestampSerializer
中
。第一个问题是,有人知道如何请求Kafka流在内部
使用
正确的方法签名来调用该方法吗? 我正在探索解决这一问题的方法,包括编写
使用
消息本身
中
的模式标识符重新序列化的新Serdes。其思想是首先将值读取为byte[],然后
在
转换器中将值反序列化为我的<
浏览 5
提问于2021-11-10
得票数 2
回答已采纳
点击加载更多
扫码
添加站长 进交流群
领取专属
10元无门槛券
手把手带您无忧上云
相关
资讯
Kafka streams概览
Kafka Streams与Quarkus:实时处理事件
初探Kafka Streams
分布式消息队列Kafka学习笔记
深入理解Kafka Connect:转换器和序列化
热门
标签
更多标签
云服务器
ICP备案
对象存储
腾讯会议
云直播
活动推荐
运营活动
广告
关闭
领券