腾讯云
开发者社区
文档
建议反馈
控制台
登录/注册
首页
学习
活动
专区
圈层
工具
MCP广场
文章/答案/技术大牛
搜索
搜索
关闭
发布
文章
问答
(696)
视频
沙龙
1
回答
Kafka
streams
DSL
:
聚合
、
丰富
和
发送
我们使用
Kafka
Streams
需要解决以下问题:3-
丰富
该事件的完整
聚合
状态的消息,并将其
发送
到输出主题 重点是我们不能真的丢失一条消息,它必须始终用最新的
聚合
状态(我们在消息处理过程中实际评估的状态)来
丰富
传入的消息。到目前为止
浏览 17
提问于2017-02-11
得票数 4
回答已采纳
2
回答
Kafka
Streams
- java版本依赖关系
我正在使用Java1.6运行我的应用程序,并计划将一些方面与
Kafka
集成。 我正在探索是否应该使用
Kafka
Consumer (使用poll())或使用KafkaStreams API。
浏览 3
提问于2018-12-20
得票数 0
1
回答
Kafka
Streams
DSL
进程方法是如何工作的?
、
我一直在使用
Kafka
Streams
,并获得了一些基本的功能,但我在理解
Kafka
Streams
DSL
中的process方法时遇到了一些问题。具体地说: 我知道有两种使用
Kafka
Streams
的方法,低级处理API
和
高级
Streams
DSL
。对于较低的级别,您可以更明确地定义拓扑,命名每个节点等,而流
DSL
则将大部分抽象出来。所以我的问题是,处理过的数据-Processor的void f
浏览 9
提问于2020-01-20
得票数 0
回答已采纳
1
回答
如何将自定义StateStore添加到
Kafka
流
DSL
处理器?
对于我的
Kafka
流应用程序之一,我需要使用
DSL
和
处理器API的特性。我的流媒体应用程序流
聚合
之后,我需要向接收器
发送
一条
聚合
消息。(MeteredKeyValueStore.java:167) at org.apache.
kafka
.
streams
.processor.internals.Pro
浏览 4
提问于2016-10-24
得票数 6
回答已采纳
1
回答
使用带有Spark的
Kafka
比仅使用Spark的优点
、
、
Kafka
是很常见的。所以很多公司都在使用它。我完全理解
Kafka
和
Spark是如何工作的,我对他们都很有经验。我不理解的是用例。为什么你要把
Kafka
和
Spark一起使用,而不仅仅是Spark呢?在我看来,
Kafka
的主要用途是作为ETL管道中的中转区,用于实时(流)数据。 我假设有一个数据源集群,数据最初存储在其中。例如,它可以是Vertica、Cassandra、Hadoop等。然后是一个处理集群,它从数据源集群读取数据,并将其写入分布式
Kafka
日志,这基本上是
浏览 3
提问于2019-06-17
得票数 3
1
回答
如何用卡夫卡逐流读取卡夫卡的记录
、
我想用卡夫卡-流的消费者来阅读卡夫卡的唱片,有一个选项可以每隔一段时间读取记录吗?比如每1分钟一次?
浏览 3
提问于2017-01-02
得票数 0
1
回答
以
Kafka
和
MongoDB为源的ETL
、
、
、
、
我只是在学习Apache
Kafka
。我当前的ETL在batch process上运行,现在我希望它在流进程上运行,以便用于报告的数据始终是最新的。据我所知,我可以使用MongoDB连接器来捕获mongodb中的数据变化,然后将其
发送
到
kafka
主题。但在我的ETL中,我需要将处理后的数据存储到SQL数据库中。如何以及在哪里处理从mongodb
发送
到主题的数据,然后从该主题创建到另一个数据库的记录?我是否可以使用AWS lambda函数来执行处理
和
记录创建?但是,我如何在
kafka
中调用
浏览 4
提问于2020-06-17
得票数 0
1
回答
Kafka
Streams
SIiding窗口实现代码示例
、
、
我一直在尝试使用
Streams
DSL
在
Kafka
中实现滑动窗口,但我无法做到。有没有人能帮我举个代码例子。我想要使用滑动窗口方法
聚合
特定时间段的值。 我搜索了融合的博客,但没有代码
浏览 3
提问于2019-07-16
得票数 1
1
回答
Java中列表的动态过滤
、
、
我正在开发一个java应用程序,从来源接收实时数据(
Kafka
/
Kafka
流)。在我的案例研究中,我需要过滤我列表中的名字。这份名单最初有1000万个名字。我的第一个问题是知道什么是最好的方法
和
/技术,以获得更高效/更快的处理时间。 第二个问题是如何更新或删除列表中的一些名称?
浏览 2
提问于2020-01-25
得票数 0
1
回答
用KStream语义进行重组
、
使用
kafka
-
streams
,我希望将元素的流S E按某些键K1分组,同时将该键的所有值
聚合
到一个连接的结果AGG中。这将导致KTable T1。根据
聚合
结果,应该将该值重新划分为另一个KTable T2,按从
聚合
结果AGG中提取的键K2分组。因此,
聚合
的结果应该为下一次重组生成密钥。不是每个键K2的值 我知道
聚合
的结果是在一段时间后才被转发的,所以我已经尝试将commit.interval.ms降低到1,但没有效果。val finalTable = streamBu
浏览 2
提问于2019-09-06
得票数 1
回答已采纳
1
回答
Kafka
-
Streams
:(KTable,KTable)连接结果的
聚合
、
每个主题都有密钥
和
有效负载。我尝试加入前两个主题,汇总结果,最后将这个结果与第三个主题结合起来。但它并不像预期的那样起作用。我尝试将“映射”与“类别”连接起来,并对结果进行
聚合
"name“。此操作失败,引发以下错误:处理器KTABLE有什么方法可以使这个连接
聚合
发生吗?是否有可能有这样的输出: key, valu
浏览 1
提问于2020-02-04
得票数 1
1
回答
如何在不改变主题的情况下加入KStream
和
KTable
、
、
我加入了KStream
和
KTable。下面是我的密码。(order, address) -> order + " send to " + address)
streams
= new KafkaStreams(builder.build(), props);执行此代码后,将创建一个新主题。/
kafka
-topics.sh --
浏览 4
提问于2021-04-09
得票数 0
1
回答
对于时间序列的滚动/
聚合
,流处理比批处理更好吗?
、
、
、
、
由于原始数据需要大量存储,所以我试图
聚合
这些数据,并创建每小时、每天、每月报告的汇总。有两种方法,我可以想到: 流处理:使用
Kafka
Streams
API来卷起数据,并在Cassandra中摄取汇总的数据。
浏览 0
提问于2019-03-25
得票数 1
1
回答
转换KStream窗口
聚合
的结果
、
、
这些是我的
DSL
调用来完成我的
聚合
。 val sessionProcessorStream = builder.stream("collector-prod", Consumed.arguments (org.apache.
kafka
.
streams
.kstream.KeyValueMapper[org.apache.
kafka
.
streams
.kstream.Windowed[org.apache.
kafka
.
streams
.ks
浏览 0
提问于2018-04-02
得票数 0
回答已采纳
1
回答
如何利用期货与卡夫卡流
、
、
有一个卡夫卡集群,我从其中消费两个主题,并加入它。由于联接的结果,我对数据库做了一些操作。对DB的所有操作都是异步的,因此它们返回给我一个未来(scala.concurrent.Future,但无论如何它与java.util.concurrent.CompletableFuture相同)。因此,我得到了这样的代码:val secondSource: KTable[String, Obj2] def saveResult
浏览 3
提问于2017-02-15
得票数 5
2
回答
Kafka
Streams
DSL
over
Kafka
Consumer API
、
、
最近,在一次采访中,我被问到一个关于
Kafka
Streams
的问题,更具体地说,面试官想知道为什么/什么时候你会使用
Kafka
Streams
DSL
而不是普通的
Kafka
Consumer API来读取
和
处理消息流
浏览 2
提问于2020-09-13
得票数 3
1
回答
在Kstream Consumer Processor中使用Avro创建状态存储
、
、
它读取主题外的avro消息,并构造一个
聚合
数据的状态存储,该数据也是avro类型的。}, Materialized.with(new Serdes.StringSerde(), serdeOutEvent).toStream();该函数能够从主题中读取消息并创建第一个
聚合
结果:27) at org.apache.
kafka
.
浏览 23
提问于2021-02-27
得票数 1
回答已采纳
1
回答
kafka
流媒体中的消息加密
、
、
我最近正在尝试使用
kafka
streaming进行一些敏感数据处理。我希望实现的目标是,当敏感数据被加密时,微服务架构的能力不会受到影响,即,紧密耦合的服务
和
流数据处理。我的问题是,在
kafka
流中,有没有可能我用一个密钥解密传入的消息,然后用另一个密钥再次加密?我有一个计划,但由于我不熟悉
kafka
streaming,我不能证明
kafka
streaming有能力使用
Streams
DSL
处理此功能。有没有人能帮我解决这个问题,最好告诉我
Streams</
浏览 34
提问于2020-01-14
得票数 0
回答已采纳
2
回答
Kafka
streams
中映射操作后的双引号字符串
、
我使用
Kafka
stream、
DSL
和
map将KStream<String, JsonNode>转换为KStream<String, String>。在ValueMapper函数中,我简单地返回new ValueMapper("key", "some constant string"),但是在使用KStream.to("some topic")将值
发送
回
Kafka
的地方,我得到的结果是添加了双引号。也许是
Kafk
浏览 2
提问于2018-09-03
得票数 3
2
回答
如何修改卡夫卡单词计数程序中的KStream键
和
值?
、
、
我是新的卡夫卡流
和
一些卡在一些基本的字计数程序。卡夫卡流版本=> 2.3.0import java.util._import org.apache.
kafka
.clients.consumer.ConsumerConfig import org.apache.
kafka
.
streams
{KafkaStreams,St
浏览 1
提问于2020-03-21
得票数 1
回答已采纳
点击加载更多
相关
资讯
为什么我不推荐Kafka Streams和KSQL?
Kafka 2.0正式发布,带来众多改进
Kafka 1.0.1案例详解之Kafka Streams
初探Kafka Streams
Kafka streams概览
热门
标签
更多标签
云服务器
ICP备案
云直播
即时通信 IM
实时音视频
活动推荐
运营活动
广告
关闭
领券