编写App, 从 kafka 读取数据 新建一个Maven项目:spark-streaming-project 在依赖选择上spark-streaming-kafka此次选用0-10_2.11而非...测试是否能够从Kafka消费到数据 1....import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010...从kafka消费数据(APP) package com.buwenbuhuo.streaming.project.app import com.buwenbuhuo.streaming.project.bean.AdsInfo...运行结果 同时运行MockRealtimeData(数据生产者)和AreaTopAPP(数据消费者) ? ? 本次的分享就到这里了
欢迎您关注《大数据成神之路》 下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考。...Spark Streaming 支持多种类型数据源 Spark Streaming 基础概念 DStream Discretized Stream 是 SS 的基础抽象,代表持续性的数据流和经过各种 Spark...Spark Streaming 读取 Kafka 数据 Spark Streaming 与 Kafka 集成接收数据的方式有两种: Receiver-based Approach Direct Approach...currentBuffer 填充的速度是可以被限制的,以秒为单位,配置参数为 spark.streaming.receiver.maxRate,是单个 Receiver 每秒钟允许添加的条数。...这个是 Spark 内存控制的第一步,填充 currentBuffer 是阻塞的,消费 Kafka 的线程直接做填充。
一、版本说明 Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10,其主要区别如下:...spark-streaming-kafka-0-8spark-streaming-kafka-0-10Kafka 版本0.8.2.1 or higher0.10.0 or higherAP 状态Deprecated...kafkaParams 封装了 Kafka 消费者的属性,这些属性和 Spark Streaming 无关,是 Kafka 原生 API 中就有定义的。...创建生产者 这里创建一个 Kafka 生产者,用于发送测试数据: bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic spark-streaming-topic...从控制台输出中可以看到数据流已经被成功接收,由于采用 kafka-console-producer.sh 发送的数据默认是没有 key 的,所以 key 值为 null。
/kafka-topics.sh --list --zookeeper hadoop:2181 4、通过控制台测试是否能正常生产与消费 ....:spark-streaming-kafka-0-8_2.11:2.2.0 \ /home/hadoop/lib/spark-1.0-SNAPSHOT.jar hadoop:2181 test kafka_streaming_topic...1 3、运行后看4040端口Spark Streaming的UI界面 可以知道UI页面中, Receiver是一直都在运作的, 而Direct方式没有此Jobs Approach 2: Direct...:spark-streaming-kafka-0-8_2.11:2.2.0 \ /home/hadoop/lib/spark-1.0-SNAPSHOT.jar hadoop:9092 kafka_streaming_topic...3、运行后看4040端口Spark Streaming的UI界面 可以知道UI页面中,Direct方式没有此Jobs
最近工作有点忙,所以更新文章频率低了点,在这里给大家说声抱歉,前面已经写过在spark streaming中管理offset,但当时只知道怎么用,并不是很了解为何要那样用,最近一段时间又抽空看了一个github...本篇我们先从理论的角度聊聊在Spark Streaming集成Kafka时的offset状态如何管理。...spark streaming 版本 2.1 kafka 版本0.9.0.0 在这之前,先重述下spark streaming里面管理偏移量的策略,默认的spark streaming它自带管理的offset...的方式是通过checkpoint来记录每个批次的状态持久化到HDFS中,如果机器发生故障,或者程序故障停止,下次启动时候,仍然可以从checkpoint的目录中读取故障时候rdd的状态,便能接着上次处理的数据继续处理...直接创建InputStream流,默认是从最新的偏移量消费,如果是第一次其实最新和最旧的偏移量时相等的都是0,然后在以后的每个批次中都会把最新的offset给存储到外部存储系统中,不断的做更新。
前面的文章已经介绍了在spark streaming集成kafka时,如何处理其偏移量的问题,由于spark streaming自带的checkpoint弊端非常明显,所以一些对数据一致性要求比较高的项目里面...在spark streaming1.3之后的版本支持direct kafka stream,这种策略更加完善,放弃了原来使用Kafka的高级API自动保存数据的偏移量,之后的版本采用Simple API...本篇文章,会再介绍下,如何手动管理kafka的offset,并给出具体的代码加以分析: 版本: apache spark streaming2.1 apache kafka 0.9.0.0 手动管理offset...(2)如果非第一次启动,zk里面已经存在偏移量,所以我们读取zk的偏移量,并把它传入到KafkaUtils中,从上次结束时的偏移量开始消费处理。...例子已经上传到github中,有兴趣的同学可以参考这个链接: https://github.com/qindongliang/streaming-offset-to-zk 后续文章会聊一下为了升级应用如何优雅的关闭的流程序
上篇文章,讨论了在spark streaming中管理消费kafka的偏移量的方式,本篇就接着聊聊上次说升级失败的案例。...事情发生一个月前,由于当时我们想提高spark streaming程序的并行处理性能,于是需要增加kafka分区个数,,这里需要说下,在新版本spark streaming和kafka的集成中,按照官网的建议...spark streaming的executors的数量要和kafka的partition的个数保持相等,这样每一个executor处理一个kafka partition的数据,效率是最高的。...接下来我们便增加了kafka分区的数量,同时修改了spark streaming的executors的个数和kafka的分区个数一一对应,然后就启动了流程序,结果出现了比较诡异的问题,表现如下: 造几条测试数据打入...kafka中,发现程序总是只能处理其中的一部分数据,而每次总有一些数据丢失。
Kafka与Spark Streaming整合 概述 Spark Streaming是一个可扩展,高吞吐,容错能力强的实时流式处理处理系统。...简单来说Spark Streaming中的数据量就是DStream,然后每个时间片的数据就是RDD。...Kafka与Spark Streaming整合 整合方式 Kafka与Spark Streaming整合,首先需要从Kafka读取数据过来,读取数据有两种方式 方法一:Receiver-based...可以通过开启Write Ahead Logs来保证数据的可靠性(Spark 1.2后开始支持),这种方式和大多数存储系统的Write Ahead Logs类似,Spark会把接收到的消息及kafka消息偏移存放到分布式文件系统中...整合示例 下面使用一个示例,展示如何整合Kafka和Spark Streaming,这个例子中,使用一个生产者不断往Kafka随机发送数字,然后通过Spark Streaming统计时间片段内数字之和。
=org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafka-sink.topic = flume-kafka-streaming-topic agent1...topic flume-kafka-streaming-topic Logger-->Flume-->Kafka-->Spark Streaming 1/Java代码: object FlumeKafkaReceiverWordCount...ssc.start() ssc.awaitTermination() } } 2/启动上面的程序,即可在Console窗口实时看到单词基数 3/注意: 在本地进行测试, 在IDEA中运行...LoggerGenerator, 然后使用Flume、Kafka以及Spark Streaming进行处理操作。...在生产环境上, 1.打包jar,执行LoggerGenerator类 2.Flume、Kafka和本地测试步骤是一样的 3.Spark Streaming的代码也是需要打成jar包,然后使用spark-submit
上一篇文章我们使用Spark对MySQL进行读写,实际上Spark在工作中更多的是充当实时流计算框架 引入依赖 org.apache.spark... org.apache.spark spark-streaming-kafka...; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010....KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import java.util.*; /**...消息生产可以参考文章中的中间件:kafka入门 执行上面程序,启动kafka,在kafka文件的bin目录执行下面命令 echo '00000,{"name":"Steve", "title":"Captain
下面这段code用于在Spark Streaming job中读取Kafka的message: .........以上代码虽然可以正常运行,不过却出现了一个问题:当message size非常大(比如10MB/message)的时候,spark端的处理速度非常缓慢,在3brokers的Kafka + 32 nodes...可是在向新生成的topic中publishmessage之后却发现,并不是所有partition中都有数据。显然publish到Kafka中的数据没有平均分布。...在Kafka0.8.1.1(我们采用的Kafka版本)中,其代码如下: package kafka.producer import kafka.utils._ class DefaultPartitioner...因此所有的数据都进入到了一个partition当中。
本文原文(点击下面阅读原文即可进入) https://blog.csdn.net/xianpanjia4616/article/details/81432869 在实际的项目中,有时候我们需要把一些数据实时的写回到...kafka中去,一般的话我们是这样写的,如下: kafkaStreams.foreachRDD(rdd => { if (!..."_error", pair.value())) }) }) } }) 但是这种写法有很严重的缺点,对于每个rdd的每一个partition的数据...1、首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下: package kafka import java.util.concurrent.Future import...kafka中了 kafkaStreams.foreachRDD(rdd => { if (!
场景模拟 我试图覆盖工程上最为常用的一个场景: 1)首先,向Kafka里实时的写入订单数据,JSON格式,包含订单ID-订单类型-订单收益 2)然后,spark-streaming每十秒实时去消费kafka...中的订单数据,并以订单类型分组统计收益 3)最后,spark-streaming统计结果实时的存入本地MySQL。...pykafka,pip install pykafka java:spark,spark-streaming 下面开始 1、数据写入kafka kafka写入 我们使用pykafka模拟数据实时写入,代码如下...kafka刚才写入的数据 python kafka_consumer.py 2、spark-streaming 1)先解决依赖 其中比较核心的是spark-streaming和kafka集成包spark-streaming-kafka...中查看结果,每隔10秒会聚合出type=1-5的5条数据。
输入的数据源是可靠的 Spark Streaming实时处理数据零丢失,需要类似Kafka的数据源: 支持在一定时间范围内重新消费; 支持高可用消费; 支持消费确认机制; 具有这些特征的数据源,可以使得消费程序准确控制消费位置...以下场景任然比较糟糕: 1)接收器接收到输入数据,并把它存储到WAL中; 2)接收器在更新Zookeeper中Kafka的偏移量之前突然挂掉了; 3)Spark Streaming假设输入数据已成功收到...; 6)一旦从WAL中读取所有的数据之后,接收器开始从Kafka中消费数据。...比如当从Kafka中读取数据,你需要在Kafka的brokers中保存一份数据,而且你还得在Spark Streaming中保存一份。 5....Spark driver只需要简单地计算下一个batch需要处理Kafka中偏移量的范围,然后命令Spark Exectuor直接从Kafka相应Topic的分区中消费数据。
使用Spark Streaming对接kafka 使用Spark Streaming对接kafka之后进行计算 在mysql中创建一个数据库rng_comment 在数据库rng_comment...Streaming对接kafka之后进行计算 下面的代码完成了: 查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中 查询出评论赞的个数在10个以上的数据,并写入到...mysql数据库中的like_status表中 ---- object test03_calculate { /* 将数据从kafka集群中读取,并将数据做进一步的处理过后,写入到mysql...,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费 "auto.offset.reset" -> "earliest", //false表示关闭自动提交.由spark...,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费 "auto.offset.reset" -> "earliest", //false表示关闭自动提交.由spark
、图计算等自框架和Spark Streaming 综合起来使用 粗粒度 Spark Streaming接收到实时数据流,把数据按照指定的时间段切成一片片小的数据块,然后把小的数据块传给Spark Engine...细粒度 数据源 kafka提供了两种数据源。 基础数据源,可以直接通过streamingContext API实现。...# 基础数据源 使用官方的案例 /spark/examples/src/main/python/streaming nc -lk 6789 处理socket数据 示例代码如下: 读取socket中的数据进行流处理...RDD所 Input DStreams and Receivers # 高级数据源 # Spark Streaming 和 kafka 整合 两种模式 receiver 模式 from pyspark.streaming.kafka...--jars spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar test_spark_stream.py 需要下载相应的jar包.下载地址如下,搜索
在这里我们解释如何配置 Spark Streaming 以接收来自 Kafka 的数据。...与所有接收方一样,通过 Receiver 从 Kafka 接收的数据存储在 Spark executors 中,然后由 Spark Streaming 启动的作业处理数据。...为确保零数据丢失,你不得不另外启用 Spark Streaming 中的 Write Ahead Logs (在 Spark 1.2 中引入),同时将所有收到的 Kafka 数据保存在分布式文件系统(例如...当处理数据的作业启动后,Kafka 的简单消费者API用于从 Kafka 中读取定义的偏移量范围(类似于从文件系统读取文件)。...这是传统的从 Kafka 上消费数据的方式。
集群,由于官方的spark-streaming-kafka包和现有公司的kafka集群权限系统无法对接,需要研究下spark-streaming-kafka包原有代码以便改造,本文研究的代码版本为spark...官方给出的JavaKafkaWordCount以及KafkaWordCount代码里产生kafka-streaming消费流数据的调用代码分别如下 JavaPairReceiverInputDStream...的high level api产生数据流,产生的每个线程的数据流都被放到一个线程池由单独的线程来消费 val topicMessageStreams = consumerConnector.createMessageStreams...sparkconf属性 spark.streaming.receiver.writeAheadLog.enable为真(默认值是假) 这个receiver会把收到的kafka数据首先存储到日志上,然后才会向...参考文章 Spark Streaming容错的改进和零数据丢失
这篇博客将会记录Structured Streaming + Kafka的一些基本使用(Java 版) spark 2.3.0 1....数据源 对于Kafka数据源我们需要在Maven/SBT项目中引入: groupId = org.apache.spark artifactId = spark-sql-kafka-0-10_2.11...version = 2.3.2 首先我们需要创建SparkSession及开始接收数据,这里以Kafka数据为例 SparkSession spark = SparkSession .builder...这里我们不需要自己设置group.id参数, Kafka Source 会将自动为每个查询创建一个唯一的 group id Kafka源数据中的schema如下: Column Type...如果由于数据丢失而不能从提供的偏移量中读取任何数据,批处理查询总是会失败。