,可以通过以下步骤来实现:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import StringType
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([(1, '2022-01-01'), (2, '2022-02-01')], ['id', 'date'])
df2 = spark.createDataFrame([(3, '2022-03-01'), (4, '2022-04-01')], ['id', 'date'])
union_df = df1.union(df2)
latest_date_df = union_df.groupBy('id').agg({'date': 'max'}).withColumnRenamed('max(date)', 'latest_date')
final_df = union_df.join(latest_date_df, on='id', how='left')
final_df = final_df.withColumn('is_latest_date', when(col('date') == col('latest_date'), 'Yes').otherwise('No'))
完整代码示例如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import StringType
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([(1, '2022-01-01'), (2, '2022-02-01')], ['id', 'date'])
df2 = spark.createDataFrame([(3, '2022-03-01'), (4, '2022-04-01')], ['id', 'date'])
union_df = df1.union(df2)
latest_date_df = union_df.groupBy('id').agg({'date': 'max'}).withColumnRenamed('max(date)', 'latest_date')
final_df = union_df.join(latest_date_df, on='id', how='left')
final_df = final_df.withColumn('is_latest_date', when(col('date') == col('latest_date'), 'Yes').otherwise('No'))
final_df.show()
这段代码的功能是将两个Spark数据帧(df1和df2)合并为一个数据帧,然后为每个id找到最新的日期,并在原始数据帧中添加一个新列,用于标识是否是最新日期。结果将打印出来。
腾讯云相关产品和产品介绍链接地址可以参考腾讯云官方文档和网站,以获取最新的信息。
云+社区开发者大会 武汉站
云+社区技术沙龙[第26期]
云+未来峰会
云+社区技术沙龙[第25期]
T-Day
Elastic 中国开发者大会
云+社区开发者大会(北京站)
云+社区技术沙龙第33期
云+社区技术沙龙[第28期]
DB TALK 技术分享会
领取专属 10元无门槛券
手把手带您无忧上云