如何使用直播流API指定kafka spark流的消费组id。
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("auto.offset.reset", "largest");
kafkaParams.put("group.id", "app1");
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
虽然我已经指定了配置,但不确定是否遗漏了什么。使用spark1.3
kafkaParams.put("group.id", "app1");
发布于 2016-04-10 20:24:40
direct stream API使用低级Kafka API,因此在任何情况下都不使用消费者组。如果你想通过Spark Streaming使用消费者群体,你必须使用基于接收器的API。
Full details are available in the doc !
发布于 2018-09-01 17:23:57
spark-streaming-kafka-0-8
中的createDirectStream
不支持群组模式,因为它使用的是低级Kafka接口。
但spark-streaming-kafka-0-10
支持组模式。
Consumer Configs
在0.9.0.0中,我们引入了新的
消费者,作为旧的基于Scala的简单和高级消费者的替代品。新老消费者的配置如下所示。
在New Consumer Configs
中,它包含group.id
项。
Spark Streaming integration for Kafka 0.10
正在使用新的应用程序接口。https://spark.apache.org/docs/2.1.1/streaming-kafka-0-10-integration.html
针对Kafka 0.10的星火流集成在设计上类似于0.8 Direct Stream方法。它提供了简单的并行性,Kafka分区和Spark分区之间的1:1对应,以及对偏移量和元数据的访问。然而,由于较新的集成使用新的Kafka消费者API而不是简单的API,因此在使用上存在显著的差异。
我已经在spark-streaming-kafka-0-10
中测试了组模式,它确实可以工作。
https://stackoverflow.com/questions/36508553
复制相似问题