Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kafka与Spark Streaming整合

Kafka与Spark Streaming整合

原创
作者头像
haimingli
发布于 2021-01-12 16:29:41
发布于 2021-01-12 16:29:41
5690
举报
文章被收录于专栏:kafka消息队列kafka消息队列

Kafka与Spark Streaming整合

概述

Spark Streaming是一个可扩展,高吞吐,容错能力强的实时流式处理处理系统。一般的系统架构图是,数据从一个源点,经过Sparing Streaming处理,最后汇聚到一个系统。Spark Streaming的数据来源可以非常丰富,比如Kafka, Flume, Twitter, ZeroMQ, Kinesis 或者是任何的TCP sockets程序。对于数据的处理,Spark Streaming提供了非常丰富的高级api,例如map,redue,joini和窗口函数等等。数据处理完成后,可以存储到其他地方,比如文件系统对象存储数据库。典型的数据处理流程图:

概念简介

RDD:Resilient Distributed Datasets,弹性分部署数据集,支持两种操作:转换(transformation)从现有的数据集创建一个新的数据集;动作(actions)在数据集上运行计算后,返回一个值给驱动程序。在这里简单理解为某个时间片的数据集合即可。

DStream:和RDD概念有点类似,是RDD的集合,代表着整个数据流。简单来说Spark Streaming中的数据量就是DStream,然后每个时间片的数据就是RDD。

Kafka与Spark Streaming整合

整合方式

Kafka与Spark Streaming整合,首先需要从Kafka读取数据过来,读取数据有两种方式

  • 方法一:Receiver-based 这种方式使用一个Receiver接收Kafka的消息,如果使用默认的配置,存在丢数据的风险,因为这种方式会把从kafka接收到的消息存放到Spark的exectors,然后再启动streaming作业区处理,如果exectors挂了,那么消息可能就丢失了。可以通过开启Write Ahead Logs来保证数据的可靠性(Spark 1.2后开始支持),这种方式和大多数存储系统的Write Ahead Logs类似,Spark会把接收到的消息及kafka消息偏移存放到分布式文件系统中(例如HDFS),如果运行期间出现了故障,那么这些信息会被用于故障恢复。
  • 方法二:Direc 这种方式是Spark 1.3引入的,Spark会创建和Kafka partition一一对应的的RDD分区,然后周期性的去轮询获取分区信息,这种方式和Receier-based不一样的是,它不需要Write Ahead Logs,而是通过check point的机制记录kafka的offset,通过check point机制,保证Kafka中的消息不会被遗漏。这种方式相对于第一种方式有多种优点,一是天然支持并发,建了了和Kafka的partition分区对应的RDD分区,第二点是更高效,不需要write ahead logs,减少了写磁盘次数,第三种优点是可以支持exactly-once语义,通过checkpoint机制记录了kafka的offset,而不是通过zk或者kafka来记录offset能避免分布式系统中数据不一致的问题,从而能支持exactly-once语义,当然这里不是说这样就完全是exactly-once了,还需要消费端配合做消息幂等或事物处理。这种模式是较新的模式,推荐使用该模式,第一种方式已经逐步被淘汰。

整合示例

下面使用一个示例,展示如何整合Kafka和Spark Streaming,这个例子中,使用一个生产者不断往Kafka随机发送数字,然后通过Spark Streaming统计时间片段内数字之和。

1.往kafka随机发送数字代码:

代码语言:txt
AI代码解释
复制
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "spark-producer-demo-client");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String,String> producer = new KafkaProducer<>(properties);

        Random random = new Random();
        while (true) {
            int value = random.nextInt(10);
            ProducerRecord<String, String> message =
                    new ProducerRecord<>(topic, value+"");
            producer.send(message, (recordMetadata, e) -> {
                if (recordMetadata != null) {
                    System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + ":" +
                            recordMetadata.offset());
                }
            });
            TimeUnit.SECONDS.sleep(1);

2.提交到spark集群代码,用于统计2秒时间间隔的数字之和,这里我们使用的是Direct模式:

代码语言:txt
AI代码解释
复制
 def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]")
      .setAppName("StreamingWithKafka")
    val ssc = new StreamingContext(sparkConf, Seconds(2)) // 1
    ssc.checkpoint(checkpointDir)

    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
        classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
        classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false:java.lang.Boolean)
    )

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc, PreferConsistent, // 2
      Subscribe[String, String](List(topic), kafkaParams))
    val value = stream.map(record => {
      val intVal = Integer.valueOf(record.value())
      println(intVal)
      intVal
    }).reduce(_+_)

    value.print()

    ssc.start
    ssc.awaitTermination
  }

上面代码中1处的代码表示聚合处理的时间分片为2秒,计算两秒内的随机数之和。2处的代码用于指定spark执行器上面的kafka consumer分区分配策略,一共有三种类型,PreferConsistent是最常用的,表示订阅主题的分区均匀分配到执行器上面,然后还有PreferBrokers,这种机制是优先分配到和broker相同机器的执行器上,还有一种是PreferFixed,这种是手动配置,用的比较少。上面程序每次计算2秒时间间隔内的数字之和,输入会类似如下:

代码语言:txt
AI代码解释
复制
3
4
...

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
100天机器学习实践之第1天
练习中,这两个重要的库每次都要导入。Numpy包含数学函数,Pandas用于导入和管理数据集。
fanzhh
2019/08/20
7440
机器学习第3天:多元线性回归
简单线性回归:影响Y的因素唯一,只有一个。 多元线性回归:影响Y的因数不唯一,有多个。
K同学啊
2019/01/22
8280
特征工程系列:特征预处理(下)
关于作者:JunLiang,一个热爱挖掘的数据从业者,勤学好问、动手达人,期待与大家一起交流探讨机器学习相关内容~
木东居士
2019/08/16
2.6K0
特征工程系列:特征预处理(下)
6个步骤搞定金融数据挖掘预处理
数据预处理没有标准的流程,通常针对不同的任务和数据集属性的不同而不同。下面就一起看下常用六大步完成数据预处理。其中数据获取可以参考金融数据准备。
数据STUDIO
2021/06/24
1.6K0
Kaggle知识点:类别特征处理
类别型特征(categorical feature)主要是指职业,血型等在有限类别内取值的特征。它的原始输入通常是字符串形式,大多数算法模型不接受数值型特征的输入,针对数值型的类别特征会被当成数值型特征,从而造成训练的模型产生错误。
Coggle数据科学
2022/05/05
1.7K0
Kaggle知识点:类别特征处理
机器学习第1天:数据预处理
strategy取值支持三种,mean(均值),median(中位数),most_frequent(众数),默认mean,axis=0表示按列进行
K同学啊
2019/01/22
9010
LabelEncoder(标签编码)与One—Hot(独热编码)
在做Kaggle项目的时候,碰到的问题,通常拿到一个比赛项目,将特征分为数字型特征和文字性特征,分别进行处理,而对于文字型特征如何处理,这时就需要用LabelEncoder(标签编码)和One—Hot(独热编码)将其转换为相应的数字型特征,再进行相应的处理。 首先了解机器学习中的特征类别:连续型特征和离散型特征
用户5745385
2019/07/04
10.1K0
LabelEncoder(标签编码)与One—Hot(独热编码)
6个步骤教你金融数据挖掘预处理
数据预处理没有标准的流程,通常针对不同的任务和数据集属性的不同而不同。下面就一起看下常用六大步完成数据预处理。
陈晨135
2021/12/31
5790
6个步骤教你金融数据挖掘预处理
机器学习100天|Day1数据预处理
http://scikit-learn.org/stable/modules/preprocessing.html#preprocessing
Ai学习的老章
2019/04/08
5240
机器学习100天|Day1数据预处理
机器学习之sklearn基础教程
在使用sklearn进行机器学习之前,需要对数据进行预处理。sklearn提供了一系列的数据预处理工具,如StandardScaler用于特征缩放,OneHotEncoder用于处理类别特征等。
公众号:码到三十五
2024/06/03
3740
机器学习之sklearn基础教程
从零到一构建AI项目实战教程第四篇:数据分割与标签编码的深入实践
在人工智能项目中,数据预处理是至关重要的一步,其中数据分割与标签编码更是不可或缺。合理的数据分割能够确保模型在训练过程中不会过拟合,同时验证集和测试集能够帮助我们评估模型的性能。而标签编码则是将分类数据转换为模型能够理解的数值形式。本文将详细介绍训练集、验证集、测试集的划分方法,以及One-Hot Encoding和Label Encoding两种常见的标签编码方式。
china马斯克
2025/01/14
2290
机器学习归一化特征编码
因为对于大多数的机器学习算法和优化算法来说,将特征值缩放到相同区间可以使得获取性能更好的模型。就梯度下降算法而言,例如有两个不同的特征,第一个特征的取值范围为1——10,第二个特征的取值范围为1——10000。在梯度下降算法中,代价函数为最小平方误差函数,所以在使用梯度下降算法的时候,算法会明显的偏向于第二个特征,因为它的取值范围更大。在比如,k近邻算法,它使用的是欧式距离,也会导致其偏向于第二个特征。对于决策树和随机森林以及XGboost算法而言,特征缩放对于它们没有什么影响。
@小森
2024/06/15
1650
机器学习归一化特征编码
如何在 Python 中将分类特征转换为数字特征?
在机器学习中,数据有不同的类型,包括数字、分类和文本数据。分类要素是采用一组有限值(如颜色、性别或国家/地区)的特征。但是,大多数机器学习算法都需要数字特征作为输入,这意味着我们需要在训练模型之前将分类特征转换为数字特征。
很酷的站长
2023/08/11
1.3K0
如何在 Python 中将分类特征转换为数字特征?
【Python】机器学习之数据清洗
数据清洗,是数据分析的星光耀眼的序幕,因为原始数据集可能蕴含各种幽灵,而这些隐患将影响最终分析和建模的辉煌表演。通过巧妙的数据清洗,数据的可靠性得以提升,为分析和模型的绚丽演绎打下坚实基石。
SarPro
2024/02/20
3700
【Python】机器学习之数据清洗
数据清洗&amp;预处理入门完整指南
数据预处理是建立机器学习模型的第一步(也很可能是最重要的一步),对最终结果有决定性的作用:如果你的数据集没有完成数据清洗和预处理,那么你的模型很可能也不会有效——就是这么简单。
机器之心
2019/03/08
1.1K0
特征工程中的缩放和编码的方法总结
数据预处理是机器学习生命周期的非常重要的一个部分。特征工程又是数据预处理的一个重要组成, 最常见的特征工程有以下一些方法:
deephub
2022/11/11
1.2K0
特征工程中的缩放和编码的方法总结
【Kaggle】Intermediate Machine Learning(缺失值+文字特征处理)
1. Introduction 按照教程给的7个特征,给定5种参数下的随机森林模型,选出mae误差最小的,进行提交 import pandas as pd from sklearn.model_selection import train_test_split # Read the data X_full = pd.read_csv('../input/train.csv', index_col='Id') X_test_full = pd.read_csv('../input/test.csv', in
Michael阿明
2020/07/13
6120
100天搞定机器学习|Day3多元线性回归
第二天100天搞定机器学习|Day2简单线性回归分析,我们学习了简单线性回归分析,这个模型非常简单,很容易理解。实现方式是sklearn中的LinearRegression,我们也学习了LinearRegression的四个参数,fit_intercept、normalize、copy_X、n_jobs。然后介绍了LinearRegression的几个用法,fit(X,y)、predict(X)、score(X,y)。最后学习了matplotlib.pyplot将训练集结果和测试集结果可视化。
Ai学习的老章
2019/04/08
6480
100天搞定机器学习|Day3多元线性回归
11个常见的分类特征的编码技术
器学习算法只接受数值输入,所以如果我们遇到分类特征的时候都会对分类特征进行编码,本文总结了常见的11个分类变量编码方法。
deephub
2023/01/18
1.2K0
Python中应用决策树算法预测客户等级
机器学习越来越多地在企业应用,本文跟大家分享一个采用python,应用决策树算法对跨国食品超市顾客等级进行预测的具体案例。
阿黎逸阳
2021/01/11
1.5K0
Python中应用决策树算法预测客户等级
推荐阅读
相关推荐
100天机器学习实践之第1天
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档