需求:使用maven或者sbt打包编译出来独立应用的形式运行。从一台服务器的7777端口接受一个以换行符分隔的多行文本,要从中筛选出包含单词error的行,并打印出来。
//Maven 索引
groupID = org.apache.spark
artifactID = spark-steaming_2.10
version = 1.2.0
//Scala流计算import声明
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
//用Scala进行流式筛选,打印包含“error”的行
//从SparkConf创建StreamingContext并指定1秒钟的处理
val ssc = new SteamingContext(conf,Seconds(1))
//连接到本地机器7777端口上后,使用收到的数据创建DStream
val lines = ssc.socketTextStream("localhost",7777)
//从DStream中筛选出包含字符串“error”的行
val errorLines = lines.filter(_.contains("error"))
// 打印拥有“error”的行
errorLines.print()
//用Scala进行流式筛选,打印出包含“error”的行
ssc.start()
//等待作业完成
ssc.awaitTermination()
注意:一个Streaming context只能执行一次,所以只有在配置好所有DStream以及所需要的输出操作之后才启动。
最后:在Linux/Mac操作系统上运行流计算应用并提供数据
$spark-submit --class com.oreilly.learningsparkexamples.scala.streamingLogInput \
$ASSEMBLY_JAR local[4]
$ nc localhost 7777 # 使你可以键入输入的行来发送给服务器
Windows nc命令对应ncat
5.1无状态转化操作中,每个批次的处理不依赖于之前批次的数据。
例如map()、filter()、reduceByKey()等。
5.2有状态转化操作中,需要使用之前批次的数据或者中间结果来计算当前批次的数据。
有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转化操作。
输出操作指定了对数据经转化操作得到的数据所要执行的操作(例如把结果输出推入外部数据库或输出到屏幕上)。