前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pyspark streaming简介 和 消费 kafka示例

pyspark streaming简介 和 消费 kafka示例

作者头像
编程黑洞
发布2023-03-06 19:30:08
9970
发布2023-03-06 19:30:08
举报
文章被收录于专栏:编程黑洞

# 简介

并不是真正的实时处理框架,只是按照时间进行微批处理进行,时间可以设置的尽可能的小。

将不同的额数据源的数据经过SparkStreaming 处理之后将结果输出到外部文件系统

  • 特点

低延时 能从错误中搞笑的恢复: fault-tolerant 能够运行在成百上千的节点 能够将批处理、机器学习、图计算等自框架和Spark Streaming 综合起来使用

  • 粗粒度

Spark Streaming接收到实时数据流,把数据按照指定的时间段切成一片片小的数据块,然后把小的数据块传给Spark Engine处理。

  • 细粒度
  • 数据源 kafka提供了两种数据源。
  1. 基础数据源,可以直接通过streamingContext API实现。如文件系统socket连接
  2. 高级的数据源,如Kafka, Flume, Kinesis等等. 可以通过额外的类库去实现。

# 基础数据源

  1. 使用官方的案例

/spark/examples/src/main/python/streaming

nc -lk 6789

  1. 处理socket数据

示例代码如下: 读取socket中的数据进行流处理

代码语言:javascript
复制
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# local 必须设为2
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

lines = ssc.socketTextStream("localhost", 9999)

words = lines.flatMap(lambda line: line.split(" "))

pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()

ssc.start()
ssc.awaitTermination()

测试

nc -lk 9999

  1. 处理文件系统数据

文件系统(fileStream(that is, HDFSM S3, NFS))暂不支持python,python仅支持文本文件(textFileStream)

示例如下,但未成功,找不到该文件。

代码语言:javascript
复制
lines = ssc.textFileStream("hdfs://txz-data0:9820/user/jim/workflow/crash/python/crash_2_hdfs.py")
  • streaming context
  • DStreams

持续化的数据流 对DStream操作算子, 比如map/flatMap,其实底层会被翻译为对DStream中的每个RDD都做相同的操作,因为一个DStream是由不同批次的RDD所

  • Input DStreams and Receivers

# 高级数据源

# Spark Streaming 和 kafka 整合

两种模式

  • receiver 模式
代码语言:javascript
复制
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "NetworkWordCount")
sc.setLogLevel("OFF")
ssc = StreamingContext(sc, 1)

# 创建Kafka streaming
line = KafkaUtils.createStream(ssc, "192.168.0.208:2181", 'test', {"jim_test": 1})

# 分词
words = line.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()

ssc.start()
ssc.awaitTermination()
  • no receiver

根据上面的代码替换掉createStream即可。

代码语言:javascript
复制
line = KafkaUtils.createDirectStream(ssc, ["jim_test"], {"metadata.broker.list": "192.168.0.208:9092"})

运行:

spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar test_spark_stream.py

需要下载相应的jar包.下载地址如下,搜索。 https://search.maven.org

jar版本会在运行程序时报错提醒。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-08-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • # 简介
  • # 基础数据源
  • # 高级数据源
    • # Spark Streaming 和 kafka 整合
    相关产品与服务
    灰盒安全测试
    腾讯知识图谱(Tencent Knowledge Graph,TKG)是一个集成图数据库、图计算引擎和图可视化分析的一站式平台。支持抽取和融合异构数据,支持千亿级节点关系的存储和计算,支持规则匹配、机器学习、图嵌入等图数据挖掘算法,拥有丰富的图数据渲染和展现的可视化方案。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档