这篇博客将会记录Structured Streaming + Kafka的一些基本使用(Java 版)
spark 2.3.0
Structured Streaming (结构化流)是一种基于 Spark SQL 引擎构建的可扩展且容错的 stream processing engine (流处理引擎)。可以使用Dataset/DataFrame API 来表示 streaming aggregations (流聚合), event-time windows (事件时间窗口), stream-to-batch joins (流到批处理连接) 等。
Dataset/DataFrame在同一个 optimized Spark SQL engine (优化的 Spark SQL 引擎)上执行计算后,系统通过 checkpointing (检查点) 和 Write Ahead Logs (预写日志)来确保 end-to-end exactly-once (端到端的完全一次性) 容错保证。
简而言之,Structured Streaming 提供快速,可扩展,容错,end-to-end exactly-once stream processing (端到端的完全一次性流处理),且无需用户理解 streaming 。
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.3.2
SparkSession spark = SparkSession
Dataset<Row> df = spark.readStream()
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic.*")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
这里我们不需要自己设置group.id参数, Kafka Source 会将自动为每个查询创建一个唯一的 group id
Column | Type |
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | long |
timestampType | int |
对于批处理和流查询,须为 Kafka source 设置以下选项。
Option | value | meaning |
assign | json string {"topicA":0,1,"topicB":2,4} | 指定 TopicPartitions 来消费。针对 Kafka Source 只能指定 "assign", "subscribe" 或 "subscribePattern" 其中的一个选项。 |
subscribe | 逗号分隔的 topics 列表 | 要订阅的 topic 列表。针对 Kafka Source 只能指定 "assign", "subscribe" 或 "subscribePattern" 其中的一个选项 |
subscribePattern | Java regex string | 用于订阅 topic(s) 的 pattern(模式)。针对 Kafka Source 只能指定 "assign", "subscribe" 或 "subscribePattern" 其中的一个选项。 |
kafka.bootstrap.servers | 逗号分隔的 host:port 列表 | Kafka 中的 "bootstrap.servers" 配置。 |
Option | value | default | query type | meaning |
startingOffsets | "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ | "latest" 用于 streaming, "earliest" 用于 batch(批处理) | streaming 和 batch | 当一个查询开始的时候, 或者从最早的偏移量:"earliest",或者从最新的偏移量:"latest",或JSON字符串指定为每个topicpartition起始偏移。在json中,-2作为偏移量可以用来表示最早的,-1到最新的。注意:对于批处理查询,不允许使用最新的查询(隐式或在json中使用-1)。对于流查询,这只适用于启动一个新查询时,并且恢复总是从查询的位置开始,在查询期间新发现的分区将会尽早开始。 |
endingOffsets | latest or json string {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} | latest | batch query | 当一个批处理查询结束时,或者从最新的偏移量:"latest", 或者为每个topic分区指定一个结束偏移的json字符串。在json中,-1作为偏移量可以用于引用最新的,而-2(最早)是不允许的偏移量。 |
failOnDataLoss | true or false | true | streaming query | 当数据丢失的时候,这是一个失败的查询。(如:主题被删除,或偏移量超出范围。)这可能是一个错误的警报。当它不像你预期的那样工作时,你可以禁用它。如果由于数据丢失而不能从提供的偏移量中读取任何数据,批处理查询总是会失败。 |
kafkaConsumer.pollTimeoutMs | long | 512 | streaming and batch | 在执行器中从卡夫卡轮询执行数据,以毫秒为超时间隔单位。 |
fetchOffset.numRetries | int | 3 | streaming and batch | 放弃获取卡夫卡偏移值之前重试的次数。 |
fetchOffset.retryIntervalMs | long | 10 | streaming and batch | 在重新尝试取回Kafka偏移量之前等待毫秒值。 |
maxOffsetsPerTrigger | long | none | streaming and batch | 对每个触发器间隔处理的偏移量的最大数量的速率限制。偏移量的指定总数将按比例在不同卷的topic分区上进行分割。 |
Dataset<Row> tboxDataSet = rawDataset
.where("topic = my_topic")
.select(functions.from_json(functions.col("value").cast("string"), tboxScheme).alias("parsed_value"))
Dataset<Row> windowtboxDataSet = tboxDataSet
.withWatermark("timestamp", "5 seconds")
.groupBy(functions.window(functions.col("timestamp"), "10 minutes", "5 minutes"),
ataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
.withWatermark("timestamp", "10 minutes")
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
时,会将 12:22
归入两个窗口 12:15-12:25
,对于记录 12:24|dog owl
同理产生两条记录:12:15-12:25|dog owl
、12:20-12:30|dog owl
, word
, count
三列的状态集我们继续来看前面 window() + groupBy().count() 的例子,现在我们考虑将结果输出,即考虑 OutputModes:
Complete 的输出是和 State 是完全一致的:
Append 的语义将保证,一旦输出了某条 key,未来就不会再输出同一个 key。
所以,在上图 12:10
这个批次直接输出 12:00-12:10|cat|1
, 12:05-12:15|cat|1
将是错误的,因为在 12:20
将结果更新为了 12:00-12:10|cat|2
,但是 Append 模式下却不会再次输出 12:00-12:10|cat|2
,因为前面输出过了同一条 key 12:00-12:10|cat
为了解决这个问题,在 Append 模式下,Structured Streaming 需要知道,某一条 key 的结果什么时候不会再更新了。当确认结果不会再更新的时候(下一篇文章专门详解依靠 watermark 确认结果不再更新),就可以将结果进行输出。
如上图所示,如果我们确定 12:30
这个批次以后不会再有对 12:00-12:10
这个 window 的更新,那么我们就可以把 12:00-12:10
的结果在 12:30
这个批次输出,并且也会保证后面的批次不会再输出 12:00-12:10
的 window 的结果,维护了 Append 模式的语义。
Update 模式已在 Spark 2.1.1 及以后版本获得正式支持。
如上图所示,在 Update 模式中,只有本执行批次 State 中被更新了的条目会被输出:
这个 window 不会再被更新,因而将其从 State 中去除,但没有因此产生输出。对上面这个例子泛化一点,是:
时,Structured Streaming 将依靠 watermark 机制来限制状态存储的无限增长、并(对 Append 模式)尽早输出不再变更的结果。
换一个角度,如果既不是 Append 也不是 Update 模式,或者是 Append 或 Update 模式、但不需状态做跨执行批次的聚合时,则不需要启用 watermark 机制。
具体的,我们启用 watermark 机制的方式是:
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("timestamp", "10 minutes") // 注意这里的 watermark 设置!
window($"timestamp", "10 minutes", "5 minutes"),
这样即告诉 Structured Streaming,以 timestamp 列的最大值为锚点,往前推 10min 以前的数据不会再收到。这个值 —— 当前的最大 timestamp 再减掉 10min —— 这个随着 timestamp 不断更新的 Long 值,就是 watermark。
这个批次结束后,锚点变成了 12:20|dog owl
这条记录的 event time 12:20
,watermark 变成了 12:20 - 10min = 12:10
批次结束时,即知道 event time 12:10
以前的数据不再收到了,因而 window 12:00-12:10
的结果也不会再被更新,即可以安全地输出结果 12:00-12:10|cat|2
输出以后,State 中也不再保存 window 12:00-12:10
的相关信息 —— 也即 State Store 中的此条状态得到了清理。定义完 final result DataFrame/Dataset ,剩下的就是开始 streaming computation 。 为此,我们须使用 DataStreamWriter 通过 Dataset.writeStream() 返回。
.format("parquet") // can be "orc", "json", "csv", etc.
.option("path", "path/to/destination/dir")
Column | Type |
key (optional) | string or binary |
value (required) | string or binary |
topic (*optional) | string |
某些 sinks 是不容错的,因为它们不能保证输出的持久性并且仅用于调试目的。参见前面的部分 容错语义 。以下是 Spark 中所有接收器的详细信息。
Sink (接收器) | Supported Output Modes (支持的输出模式) | Options (选项) | Fault-tolerant (容错) | Notes (说明) |
File Sink (文件接收器) | Append (附加) | Yes | 支持对 partitioned tables (分区表)的写入。按时间 Partitioning (划分)可能是有用的。 | |
Foreach Sink | Append, Update, Compelete (附加,更新,完全) | None | 取决于 ForeachWriter 的实现。 | 更多详细信息在 下一节 |
Console Sink (控制台接收器) | Append, Update, Complete (附加,更新,完全) |
| No | |
Memory Sink (内存接收器) | Append, Complete (附加,完全) | None | 否。但是在 Complete Mode 模式下,重新启动的查询将重新创建完整的表。 | Table name is the query name.(表名是查询的名称) |
foreach 操作允许在输出数据上计算 arbitrary operations 。从 Spark 2.1 开始,这只适用于 Scala 和 Java 。为了使用这个,你必须实现接口 ForeachWriter 其具有在 trigger (触发器)之后生成 sequence of rows generated as output (作为输出的行的序列)时被调用的方法。
// storage result into mongodb
.queryName("mongodb" + collectionName)
.foreach(new ForeachWriter<Row>() {
Map<String, String> writeOverrides = new HashMap<String, String>() {{
put("uri", MongoDbConfig.MONGO_DB_URI);
put("database", MongoDbConfig.MONGO_MOFANG_TSP_DATA_DB);
put("collection", collectionName);
WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);
MongoConnector mongoConnector = null;
ArrayList<Row> list = null;
public void process(Row value) {
public void close(Throwable errorOrNull) {
if (!list.isEmpty()) {
mongoConnector.withCollectionDo(writeConfig, Document.class, (MongoCollection<Document> mongoCollection) -> {
for (Row row : list) {
Map<String, Object> map = new HashMap<>();
String[] fieldNames = row.schema().fieldNames();
for (String s : fieldNames) {
map.put(s, row.getAs(s));
Document document = new Document(map);
return null;
public boolean open(long partitionId, long version) {
mongoConnector = MongoConnector.apply(writeConfig.asOptions());
list = new ArrayList<>();
return true;
StreamingQuery query = df.writeStream().format("console").start(); // get the query object
query.id(); // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId(); // get the unique id of this run of the query, which will be generated at every start/restart
query.name(); // get the name of the auto-generated or user-specified name
query.explain(); // print detailed explanations of the query
query.stop(); // stop the query
query.awaitTermination(); // block until query is terminated, with stop() or with error
query.exception(); // the exception if the query has been terminated with error
query.recentProgress(); // an array of the most recent progress updates for this query
query.lastProgress(); // the most recent progress update of this streaming query
SparkSession spark = ...
spark.streams().active(); // get the list of currently active streaming queries
spark.streams().get(id); // get a query object by its unique id
spark.streams().awaitAnyTermination(); // block until any one of them terminates
如有侵权,请联系 cloudcommunity@tencent.com 删除。
