我创建了一个带有JSON格式记录的Kafka主题。
我可以使用kafka-console-consumer.sh使用这些JSON字符串
./kafka-console-consumer.sh --new-consumer \
--topic test \
--from-beginning \
--bootstrap-server host:9092 \
--consumer.config /root/client.properties我如何使用Python中的Spark流来实现这一点?
发布于 2017-05-24 12:21:32
为什么Python不是Scala?!然后,您的家庭练习将是将下面的代码重写为Python ;-)
来自先进源
从Spark2.1.1开始,在这些源中,Kafka、Kinesis和Flume都可以在Python中获得。
基本上,这一过程是:
使用spark-streaming-kafka-0-10_2.11库从卡夫卡主题中读取消息,如火花流+ Kafka集成指南(Kafka broker版本0.10.0或更高)使用KafkaUtils.createDirectStream描述的那样。
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)使用map操作符将ConsumerRecords复制到值,这样您就不会遇到序列化问题。
stream.map(record => (record.key, record.value))如果你不发送密钥,只发送record.value就足够了。
stream.map(record => record.value)将字符串消息转换为JSON,一旦您有了这些值,您将使用json函数:
from_json(e: from_json,schema: StructType)将包含JSON字符串的列解析为具有指定模式的
StructType。如果字符串不可解析,则返回null。
守则如下:
...foreach { rdd =>
messagesRDD.toDF.
withColumn("json", from_json('value, jsonSchema)).
select("json.*").show(false)
}完成了!
https://stackoverflow.com/questions/44157828
复制相似问题