之前已经写过一篇文章,StreamingPro 支持Spark Structured Streaming,不过当时只是玩票性质的,因为对Spark 2.0+ 版本其实也只是尝试性质的,重点还是放在了spark 1.6 系列的。不过时间在推移,Spark 2.0+ 版本还是大势所趋。所以这一版对底层做了很大的重构,StreamingPro目前支持Flink,Spark 1.6+, Spark 2.0+ 三个引擎了。
下载streamingpro for spark 2.0的包,然后下载spark 2.1 的安装包。
你也可以在 streamingpro目录 找到spark 1.6+ 或者 flink的版本。最新的大体会按如下格式统一格式了:
streamingpro-spark-0.4.14-SNAPSHOT.jar 适配 spark 1.6+,scala 2.10
streamingpro-spark-2.0-0.4.14-SNAPSHOT.jar 适配 spark 2.0+,scala 2.11
streamingpro.flink-0.4.14-SNAPSHOT-online-1.2.0.jar 适配 flink 1.2.0, scala 2.10
写一个json文件ss.json,内容如下:
{
"scalamaptojson": {
"desc": "测试",
"strategy": "spark",
"algorithm": [],
"ref": [
],
"compositor": [
{
"name": "ss.sources",
"params": [
{
"format": "socket",
"outputTable": "test",
"port":"9999",
"host":"localhost",
"path": "-"
},
{
"format": "com.databricks.spark.csv",
"outputTable": "sample",
"header":"true",
"path": "/Users/allwefantasy/streamingpro/sample.csv"
}
]
},
{
"name": "ss.sql",
"params": [
{
"sql": "select city from test left join sample on test.value == sample.name",
"outputTableName": "test3"
}
]
},
{
"name": "ss.outputs",
"params": [
{
"mode": "append",
"format": "console",
"inputTableName": "test3",
"path": "-"
}
]
}
],
"configParams": {
}
}
}
大体是一个socket源,一个sample文件。socket源是流式的,sample文件则是批处理的。sample.csv内容如下:
id,name,city,age
1,david,shenzhen,31
2,eason,shenzhen,27
3,jarry,wuhan,35
然后你在终端执行 nc -lk 9999 就好了。
然后运行spark程序:
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-spark-2.0-0.4.14-SNAPSHOT.jar \
-streaming.name test \
-streaming.platform spark_structrued_streaming \
-streaming.job.file.path file://$SHome/ss.json
在nc 那个终端输入比如eason ,然后回车,马上就可以看到spark终端接受到了数据。