可以通过以下步骤实现:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType
spark = SparkSession.builder.appName("KafkaToCSV").getOrCreate()
kafka_topic = "your_kafka_topic"
kafka_bootstrap_servers = "your_kafka_bootstrap_servers"
kafka_group_id = "your_kafka_group_id"
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic) \
.option("startingOffsets", "latest") \
.option("group.id", kafka_group_id) \
.load()
schema = StructType([StructField("key", StringType(), True),
StructField("value", StringType(), True)])
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.select(from_json("value", schema).alias("data")) \
.select("data.*")
output_path = "your_output_path"
query = df.writeStream.format("csv") \
.option("path", output_path) \
.option("checkpointLocation", "your_checkpoint_location") \
.start()
query.awaitTermination()
在上述代码中,需要替换以下内容:
这样,pyspark就会从Kafka主题中读取数据,并将结果写入指定的csv文件中。
腾讯云相关产品和产品介绍链接地址:
领取专属 10元无门槛券
手把手带您无忧上云