我想要记录从spark结构化数据流的传入流中读取到数据库的记录数量。我正在使用foreachbatch来转换传入的流批处理,并将其写入所需的位置。如果在特定的小时内没有记录,我想记录读取的0条记录。但是,当没有流时,foreach批处理不会执行。有人能帮我吗?我的代码如下: val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameter
我开始学习火花,并有一个困难的时间理解背后的合理性结构化流在星火。结构化流将到达的所有数据视为无界输入表,其中数据流中的每个新项都被视为表中的新行。val spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
val csvSchema = new StructType()string").add("price", "string").add(