前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Streaming 快速入门系列(3) | DStream中如何创建数据源

Spark Streaming 快速入门系列(3) | DStream中如何创建数据源

作者头像
不温卜火
发布2020-10-28 17:39:45
1K0
发布2020-10-28 17:39:45
举报
文章被收录于专栏:不温卜火

Spark Streaming 原生支持一些不同的数据源。

一. RDD 队列(测试用)

  • 1. 用法及说明

  测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。

  • 2. 案例实操 需求:循环创建几个 RDD,将 RDD 放入队列。通过 Spark Streaming创建 Dstream,计算 WordCount
代码语言:javascript
复制
package com.buwenbuhuo.spark.streaming.day01

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
 *
 * @author 不温卜火
 * @create 2020-08-07 13:08
 *  MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object WordCount1 {
  def main(args: Array[String]): Unit = {
    // 从RDD队列中读取数据,仅仅用于压力测试
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount1")
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))
    val rdds: mutable.Queue[RDD[Int]] = mutable.Queue[RDD[Int]]()
    val sourceStream: InputDStream[Int] = ssc.queueStream(rdds,false)
    val result: DStream[Int] = sourceStream.reduce(_+_)
    result.print()
    ssc.start()

    val sc: SparkContext = ssc.sparkContext
    while (true) {
      rdds.enqueue(sc.parallelize(1 to 100))
     Thread.sleep(10)
    }


    ssc.awaitTermination()
  }
}
  • 3. 运行结果

二. 自定义数据源

  • 1. 使用及说明

  其实就是自定义接收器

  需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。

  • 2. 需求:

  自定义数据源,实现监控某个端口号,获取该端口号内容。

  • 3. 源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.streaming.day01


import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

/**
 *
 * @author 不温卜火
 * @create 2020-08-07 17:03
 *  MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object MyReceiverDemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MyReceiverDemo")
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))

    val sourceStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop002",9999)

    sourceStream
        .flatMap(_.split(" "))
        .map((_,1))
        .reduceByKey(_ + _)
        .print()

    ssc.start()
    ssc.awaitTermination()
  }
}
// 接收器从socket接受数据
class MyReceiver(host:String,port:Int) extends Receiver[String](storageLevel = StorageLevel.MEMORY_ONLY) {

  var socket:Socket = _
  var reader:BufferedReader = _
  override def onStart(): Unit = {
    runInThread{
      try {
        socket = new Socket(host, port)
        reader = new BufferedReader(new InputStreamReader(socket.getInputStream, "utf-8"))
        var line: String = reader.readLine()
        // 当对方发送一个流结束标志的时候,会受到null
        while (line != null && socket.isConnected) {
          store(line)
          line = reader.readLine()  // 如果流中没有数据,这将会一直阻塞
        }
      }catch {
        case e => e.printStackTrace()
      }finally {
        restart("重启服务器")
        // 自动立即调用onStop,然后再调用onStart
      }
    }
  }

  // 在一个子线程中去执行传入的代码
  def runInThread(op: => Unit) ={
      new Thread(){
        override def run():Unit = op
      }.start()
  }

  // 释放资源
  override def onStop(): Unit = {
    if(socket != null) socket.close()
    if(reader != null) reader.close()
  }
}
  • 4. 测试结果
代码语言:javascript
复制
nc -lk 9999

三. Kafka 数据源

1. 准备工作

  • 1. 用法及说明

  在工程中需要引入 Maven 依赖 spark-streaming-kafka_2.11来使用它。

  包内提供的 KafkaUtils 对象可以在 StreamingContext和JavaStreamingContext中以你的 Kafka 消息创建出 DStream。

  两个核心类:KafkaUtils、KafkaCluster

  • 2. 导入依赖
代码语言:javascript
复制
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
  • 3. 启动zookeeper和kafka集群
代码语言:javascript
复制
[bigdata@hadoop002 zookeeper-3.4.10]$ bin/start-allzk.sh 
[bigdata@hadoop002 kafka]$ start-kafkaall.sh 
  • 4. 开启一个生产者和消费端
代码语言:javascript
复制
// 生产者
[bigdata@hadoop002 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop002:9092 --topic first0810

// 消费者
[bigdata@hadoop002 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop002:9092 --topic first0810
  • 5. 通过IDEA接收数据
代码语言:javascript
复制
package com.buwenbuhuo.spark.streaming.day01.kafka
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 **
   @author 不温卜火
 * @create 2020-08-10 9:34
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object WorldCount1 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WorldCount1")
    val ssc = new StreamingContext(conf, Seconds(3))

    val params: Map[String, String] = Map[String,String](
      "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092",
      "group.id" -> "0810"
    )

    val srouceStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc,
      params,
      Set("first0810")
    )
    srouceStream.print
    ssc.start()
    ssc.awaitTermination()

  }
}
/*
kafkaUtils
 */

2. 正式运行

2.1 直接消费

没有checkpoint,数据有可能丢失

  • 1. 源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.streaming.day01.kafka
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 **
   @author 不温卜火
 * @create 2020-08-10 9:34
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object WorldCount1 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WorldCount1")
    val ssc = new StreamingContext(conf, Seconds(3))

    val params: Map[String, String] = Map[String,String](
      "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092",
      "group.id" -> "0810"
    )

    KafkaUtils
      .createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc,
          params,
          Set("first0810")
    ).flatMap {
      case (_, v) =>
        v.split("\\W+")
    }.map((_,1))
        .reduceByKey(_ + _)
        .print()

    ssc.start()
    ssc.awaitTermination()

  }
}
/*
kafkaUtils
 */
  • 2. 运行结果

2.2 with checkpoint(解决数据丢失问题)

缺点: 小文件过多

  • 1. 源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.streaming.day01.kafka

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 **
 *
 * @author 不温卜火
 * @create 2020-08-10 11:24
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object WordCount2 {
  def creatSSC(): StreamingContext ={
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WorldCount2")
    val ssc = new StreamingContext(conf, Seconds(3))
    // 把offset的跟踪在checkpoint中
    ssc.checkpoint("ck1")

    val params: Map[String, String] = Map[String,String](
      "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092",
      "group.id" -> "0810"
    )

    KafkaUtils
      .createDirectStream[String, String, StringDecoder, StringDecoder](
        ssc,
        params,
        Set("first0810")
      ).flatMap {
      case (_, v) =>
        v.split("\\W+")
    }.map((_,1))
      .reduceByKey(_ + _)
      .print()

    // 返回一个ssc
    ssc
  }
  def main(args: Array[String]): Unit = {
    /*
    从ckeckpoint中恢复一个StreamingContext,
    如果ckeckpoint不存在,则调用后面的函数去创建一个StreamingContext
     */
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("ck1",creatSSC)


    ssc.start()
    ssc.awaitTermination()

  }
}
  • 2. 运行结果

2.3 with checkpoint的改良版本(常用)

  • 1. 源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.streaming.day01.kafka

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.Err
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 **
 *
 * @author 不温卜火
 * @create 2020-08-10 12:29
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object WorldCount3 {
  val groupId = "0810"
  val params: Map[String, String] = Map[String, String](
    "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092",
    "group.id" -> groupId)
  val topics: Set[String] = Set("first0810")
  // KafkaUtils   KafkaCluster
  val cluster: KafkaCluster = new KafkaCluster(params)
  /*
  读取开始的offsets
   */
  def readOffsets() = {
    var resultMap = Map[TopicAndPartition, Long]()
    // 1.获取这些topic的所有分区
    val topicAndPartitionSetEither: Either[Err, Set[TopicAndPartition]] = cluster.getPartitions(topics)
    topicAndPartitionSetEither match {
      // 2. 获取topic和分区的信息
      case Right(topicAndPartitionSet: Set[TopicAndPartition]) =>
        // 3. 获取到分区信息和他的offset
        val topicAndPartitionToLongEither: Either[Err, Map[TopicAndPartition, Long]] =
          cluster.getConsumerOffsets(groupId, topicAndPartitionSet)
        topicAndPartitionToLongEither match {
            // 没有每个topic的每个分区都已经存储过偏移量,表示曾经消费过,而且也维护过这个偏移量
          case Right(map) =>
              resultMap ++= map
            // 表示这个topic的这个分区是第一次消费
          case _ =>
            topicAndPartitionSet.foreach(topicAndPartition => {
              resultMap += topicAndPartition -> 0L
            })
        }

      case  _=>  // 表示不存在任何topic
    }

    resultMap
  }
  def saveOffsets(stream: InputDStream[String]): Unit ={
    // 保存offset一定从kafka消费到的直接的那个Stream保存
    // 每个批次执行一次传递过去的函数
    stream.foreachRDD(rdd =>{
      var map: Map[TopicAndPartition, Long] = Map[TopicAndPartition,Long]()
      // 如果这个rdd是直接来自与Kafka,则可以强转成 HasOffsetRanges
      val hasOffsetRanges: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]
      // 所有的分区的偏移量
      val ranges: Array[OffsetRange] = hasOffsetRanges.offsetRanges
      ranges.foreach(OffsetRange => {
        val key: TopicAndPartition = OffsetRange.topicAndPartition()
        val value: Long = OffsetRange.untilOffset
        map += key -> value
      })
      cluster.setConsumerOffsets(groupId,map)
    })
  }

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WorldCount3")
    val ssc = new StreamingContext(conf, Seconds(3))


    val sourceStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
      ssc,
      params,
      readOffsets(),
      (handler: MessageAndMetadata[String, String]) => handler.message()
    )
    sourceStream
        .flatMap(_.split("\\W+"))
        .map((_,1))
        .reduceByKey(_+_)
        .print(1000)  //  如果不写数字具体为10行


    saveOffsets(sourceStream)
    ssc.start()
    ssc.awaitTermination()

  }
}
  • 运行结果

  本次的分享就到这里了

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/08/18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. RDD 队列(测试用)
  • 二. 自定义数据源
  • 三. Kafka 数据源
    • 1. 准备工作
      • 2. 正式运行
        • 2.1 直接消费
        • 2.2 with checkpoint(解决数据丢失问题)
        • 2.3 with checkpoint的改良版本(常用)
    相关产品与服务
    云服务器
    云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档