Spark Streaming
SparkStreaming部分没做知识点的笔记,直接从代码上理解它的用法。后面整理Storm的时候会与SparkStreaming做一个对比,如果这时候难以理解SparkStreaming的话就先照着代码学会怎么用,后面结合Storm来理解实时计算体系。
flume+SparkStreaming.conf
---SparkStreaming集成flume的flume配置
#以下是push模式
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/lc/log/1.txt
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#a1.sinks.k1.type = avro
#a1.sources.k1.channels = c1
#a1.sinks.k1.hostname = localhost
#a1.sinks.k1.port = 19999
#使用pull模式。需要三个jar包放在flume/lib下,分别是spark-streaming-flume-sink_2.11-2.1.1.jar ,scala-#library-2.11.8.jar, commons-lang3-3.3.2.jar,注意版本是你程序用的版本。
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
#a1.sources.k1.channels = c1
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 19999
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
FlumeSparkStreaming.scala ---Spark代码
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.
object FlumeSparkStreaming extends App{
val spark = SparkSession.builder().master("local[*]").appName("Streaming").getOrCreate()
val sc = spark.sparkContext;sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))
val flumeStream = FlumeUtils.createPollingStream(ssc,"localhost",19999,StorageLevel.MEMORY_ONLY)
flumeStream.map(f=>f.event.toString).print()
ssc.start()
ssc.awaitTermination()
}
Flume+kafka+SparkStreaming.conf
---SparkStreaming集成flume+kafka的flume配置
#flume+kafka+sparkStreaming集成
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/sker/wc.txt
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test01
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume启动命令
#bin/flume-ng agent --conf conf --conf-file conf /Flume + kafka + SparkStreaming.conf --name a1 -#Dflume.root.logger=INFO,console
FromKafkaStream.scala ---Spark代码
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object FromKafkaStreamDemo extends App{
val spark = SparkSession.builder().appName("aaaa").master("local[2]").getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("cp")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test01")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record=>{
(record.key(),record.value(),record.topic(),record.toString)
}).print()
ssc.start()
ssc.awaitTermination()
}
SparkSubmit
以上内容均为作者个人笔记,如有错误欢迎指正...
关注CSDN博客 Zonzereal,更多大数据笔记等你...
领取专属 10元无门槛券
私享最新 技术干货