首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何使用Spark流和Python使用Kafka的JSON记录?

如何使用Spark流和Python使用Kafka的JSON记录?
EN

Stack Overflow用户
提问于 2017-05-24 12:00:08
回答 1查看 3.4K关注 0票数 1

我创建了一个带有JSON格式记录的Kafka主题。

我可以使用kafka-console-consumer.sh使用这些JSON字符串

代码语言:javascript
运行
复制
./kafka-console-consumer.sh --new-consumer \
    --topic test \
    --from-beginning \
    --bootstrap-server host:9092 \
    --consumer.config /root/client.properties

我如何使用Python中的Spark流来实现这一点?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-05-24 12:21:32

为什么Python不是Scala?!然后,您的家庭练习将是将下面的代码重写为Python ;-)

来自先进源

从Spark2.1.1开始,在这些源中,Kafka、Kinesis和Flume都可以在Python中获得。

基本上,这一过程是:

使用spark-streaming-kafka-0-10_2.11库从卡夫卡主题中读取消息,如火花流+ Kafka集成指南(Kafka broker版本0.10.0或更高)使用KafkaUtils.createDirectStream描述的那样。

代码语言:javascript
运行
复制
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

使用map操作符将ConsumerRecords复制到值,这样您就不会遇到序列化问题。

代码语言:javascript
运行
复制
stream.map(record => (record.key, record.value))

如果你不发送密钥,只发送record.value就足够了。

代码语言:javascript
运行
复制
stream.map(record => record.value)

将字符串消息转换为JSON,一旦您有了这些值,您将使用json函数:

from_json(e: from_json,schema: StructType)将包含JSON字符串的列解析为具有指定模式的StructType。如果字符串不可解析,则返回null

守则如下:

代码语言:javascript
运行
复制
...foreach { rdd =>
  messagesRDD.toDF.
    withColumn("json", from_json('value, jsonSchema)).
    select("json.*").show(false)
}

完成了!

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44157828

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档