首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

连接后停止pyspark返回两个'on‘列

在使用 PySpark 进行数据处理时,如果你在连接数据后停止并返回结果,却看到了两个 'on' 列,这通常是因为在连接操作中使用了 on 关键字两次,或者是在连接条件中存在重复的列名。

基础概念

在 PySpark 中,DataFramejoin 方法用于根据指定的列或表达式将两个 DataFrame 连接起来。连接操作通常需要指定一个连接键(on),这个键在两个 DataFrame 中都存在,并且用于匹配行。

问题原因

出现两个 'on' 列的原因可能有以下几种:

  1. 重复使用 on 关键字:在连接操作中,可能不小心多次使用了 on 关键字。
  2. 连接条件中的列名重复:如果两个 DataFrame 中有相同名称的列,并且这些列被用作连接键,可能会导致混淆。

解决方法

  1. 检查连接语句:确保在连接操作中只使用了一次 on 关键字。
  2. 使用别名:如果两个 DataFrame 中有相同名称的列,可以在连接前为这些列指定不同的别名。

示例代码

假设我们有两个 DataFramedf1df2,它们都有一个名为 id 的列,我们想要根据这个列进行连接:

代码语言:txt
复制
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# 示例 DataFrame
data1 = [(1, "Alice"), (2, "Bob")]
data2 = [(1, "Engineer"), (3, "Doctor")]

df1 = spark.createDataFrame(data1, ["id", "name"])
df2 = spark.createDataFrame(data2, ["id", "occupation"])

# 正确的连接方式
joined_df = df1.join(df2, on="id", how="inner")

# 显示结果
joined_df.show()

如果 df1df2 中有其他相同名称的列,可以使用别名来避免冲突:

代码语言:txt
复制
# 假设 df1 和 df2 都有一个名为 'info' 的列
df1 = df1.withColumnRenamed("info", "info1")
df2 = df2.withColumnRenamed("info", "info2")

# 使用别名进行连接
joined_df = df1.join(df2, on=["id"], how="inner")

参考链接

通过以上方法,你可以避免在连接操作中出现两个 'on' 列的问题。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

Pyspark学习笔记(五)RDD操作(四)_RDD连接集合操作

1.join-连接 对应于SQL中常见的JOIN操作 菜鸟教程网关于SQL连接总结性资料 Pyspark中的连接函数要求定义键,因为连接的过程是基于共同的字段(键)来组合两个RDD中的记录,因此需要操作键值对...fullOuterJoin(other, numPartitions) 官方文档:pyspark.RDD.fullOuterJoin 两个RDD中各自包含的key为基准,能找到共同的Key,则返回两个...(即不一定数要相同),并且union并不会过滤重复的条目。...2.2 intersection intersection(other) 官方文档:pyspark.RDD.intersection 返回两个RDD中共有的元素,要注意,和 join 其实并不一样,...join操作只是要求 key一样,而intersection 并不要求有key,是要求两边的条目必须是一模一样,即每个字段()上的数据都要求能保持一致,即【完全一样】的两行条目,才能返回

1.3K20
  • PySpark SQL——SQL和pd.DataFrame的结合体

    中最为常用的功能之一,用法与SQL中的select关键字类似,可用于提取其中一或多,也可经过简单变换提取。.../unionAll:表拼接 功能分别等同于SQL中union和union all,其中前者是去重拼接,而后者则直接拼接,所以速度更快 limit:限制返回记录数 与SQL中limit关键字功能一致 另外...fill:广义填充 drop:删除指定 最后,再介绍DataFrame的几个通用的常规方法: withColumn:在创建新或修改已有时较为常用,接收两个参数,其中第一个参数为函数执行的列名...(若当前已有则执行修改,否则创建新),第二个参数则为该取值,可以是常数也可以是根据已有进行某种运算得到,返回值是一个调整了相应列的新DataFrame # 根据age创建一个名为ageNew的新...并返回新的DataFrame(包括原有其他),适用于仅创建或修改单列;而select准确的讲是筛选新,仅仅是在筛选过程中可以通过添加运算或表达式实现创建多个新返回一个筛选新的DataFrame

    10K20

    Spark Extracting,transforming,selecting features

    使其用于一致的标准差或者均值为0; 注意:如果一个特征的标准差是0,那么该特征处理返回的就是默认值0; from pyspark.ml.feature import StandardScaler dataFrame...(数值型做乘法、类别型做二分); .除了目标的所有; 假设a和b是两个,我们可以使用下述简单公式来演示RFormula的功能: y ~ a + b:表示模型 y~w0 + w1*a + w2*b,...近似相似连接使用两个数据集,返回近似的距离小于用户定义的阈值的行对(row,row),近似相似连接支持连接两个不同的数据集,也支持数据集与自身的连接,自身连接会生成一些重复对; 近似相似连接允许转换和未转换的数据集作为输入...,如果输入是未转换的,它将被自动转换,这种情况下,哈希signature作为outputCol被创建; 在连接的数据集中,原始数据集可以在datasetA和datasetB中被查询,一个距离会增加到输出数据集中...,它包含每一对的真实距离; 近似最近邻搜索 近似最近邻搜索使用数据集(特征向量集合)和目标行(一个特征向量),它近似的返回指定数量的与目标行最接近的行; 近似最近邻搜索同样支持转换和未转换的数据集作为输入

    21.8K41

    别说你会用Pandas

    两个库使用场景有些不同,Numpy擅长于数值计算,因为它基于数组来运算的,数组在内存中的布局非常紧凑,所以计算能力强。但Numpy不适合做数据处理和探索,缺少一些现成的数据处理函数。...而Pandas的特点就是很适合做数据处理,比如读写、转换、连接、去重、分组聚合、时间序列、可视化等等,但Pandas的特点是效率略低,不擅长数值计算。...data.csv", header=True, inferSchema=True) # 显示数据集的前几行 df.show(5) # 对数据进行一些转换 # 例如,我们可以选择某些,...", df["salary"] * 1.1) # 显示转换的数据集的前几行 df_transformed.show(5) # 将结果保存到新的 CSV 文件中 # 注意:Spark...你可能需要手动处理这个问题 df_transformed.write.csv("path_to_save_transformed_csv/transformed_data", header=True) # 停止

    12110

    PySpark基础

    执行环境入口对象SparkContext是PySpark的入口点,负责与 Spark 集群的连接,并提供了创建 RDD(弹性分布式数据集)的接口。...要使用 PySpark 库完成数据处理,首先需要构建一个执行环境的入口对象,该对象是 SparkContext 类的实例。创建 SparkContext 对象,便可开始进行数据处理和分析。...# 导包# SparkConf:用于配置Spark应用的参数# SparkContext:用于连接到Spark集群的入口点,负责协调整个Spark应用的运行from pyspark import SparkConf...的运行版本print(sc.version)# 停止SparkContext对象的运行(停止PySpark程序)sc.stop()SparkConf 类的常用方法:方法...SparkContext对象的运行(停止PySpark程序)sc.stop()输出结果:'Hello python!'

    7522

    PySpark UD(A)F 的高效使用

    两个主题都超出了本文的范围,但如果考虑将PySpark作为更大数据集的panda和scikit-learn的替代方案,那么应该考虑到这两个主题。...在UDF中,将这些转换回它们的原始类型,并进行实际工作。如果想返回具有复杂类型的,只需反过来做所有事情。...这意味着在UDF中将这些转换为JSON,返回Pandas数据帧,并最终将Spark数据帧中的相应列从JSON转换为复杂类型 [2enpwvagkq.png] 5.实现 将实现分为三种不同的功能: 1)...除了转换的数据帧外,它还返回一个带有列名及其转换的原始数据类型的字典。 complex_dtypes_from_json使用该信息将这些精确地转换回它们的原始类型。...但首先,使用 complex_dtypes_to_json 来获取转换的 Spark 数据帧 df_json 和转换 ct_cols。

    19.6K31

    【Python】PySpark 数据计算 ③ ( RDD#reduceByKey 函数概念 | RDD#reduceByKey 方法工作流程 | RDD#reduceByKey 语法 | 代码示例 )

    首先将 A 和 B 进行聚合 得到 X , 然后将 X 与 C 进行聚合得到新的值 Y ; 具体操作方法是 : 先将相同 键 key 对应的 值 value 列表中的元素进行 reduce 操作 , 返回一个减少的值...V 类型的参数 , 参数类型要相同 , 返回一个 V 类型的返回值 , 传入的两个参数和返回值都是 V 类型的 ; 使用 reduceByKey 方法 , 需要保证函数的 可结合性 ( associativity...) : 将两个具有 相同 参数类型 和 返回类型 的方法结合在一起 , 不会改变它们的行为的性质 ; 两个方法结合使用的结果与执行顺序无关 ; 可重入性 ( commutativity ) : 在多任务环境下...Key 下的 Value 相加 rdd2 = rdd.reduceByKey(lambda a, b: a + b) # 打印新的 RDD 中的内容 print(rdd2.collect()) # 停止..., 也就是统计 键 Key 的个数 rdd4 = rdd3.reduceByKey(lambda a, b: a + b) print("最终统计单词 : ", rdd4.collect()) # 停止

    60620

    PySpark︱DataFrame操作指南:增删改查合并统计与数据处理

    -------- pandas-spark.dataframe互转 转化为RDD -------- 8、SQL操作 -------- -------- 9、读写csv -------- 延伸一:去除两个表重复的内容...— 2.2 新增数据 withColumn— withColumn是通过添加或替换与现有列有相同的名字的返回一个新的DataFrame result3.withColumn('label', 0)...joinDF2 , joinDF1("id" ) === joinDF2( "t1_id")) 跟pandas 里面的left_on,right_on — 3.2 求并集、交集 — 来看一个例子,先构造两个...functions df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show() 整合GroupedData...类型可用的方法(均返回DataFrame类型): avg(*cols) —— 计算每组中一或多的平均值 count() —— 计算每组中一共有多少行,返回DataFrame

    30.4K10

    【Python】PySpark 数据处理 ② ( 安装 PySpark | PySpark 数据处理步骤 | 构建 PySpark 执行环境入口对象 )

    ', ConnectionResetError(10054, '远程主机强迫关闭了一个现有的连接。'...执行环境入口对象 执行 数据读取操作 , 读取得到 RDD 类实例对象 ; 然后 , 进行 数据处理计算 , 对 RDD 类实例对象 成员方法进行各种计算处理 ; 最后 , 输出 处理的结果 ,...中 , 进行数据处理 ; 数据处理完毕 , 存储到 内存 / 磁盘 / 数据库 中 ; 三、构建 PySpark 执行环境入口对象 如果想要使用 PySpark 进行数据处理 , 必须构建一个 PySpark...数据处理 任务 , 调用 SparkContext#stop 方法 , 停止 Spark 程序 ; # 停止 PySpark 程序 sparkContext.stop() 四、代码示例 代码示例 :...# 停止 PySpark 程序 sparkContext.stop() 执行结果 : Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts

    46621

    【Python】PySpark 数据计算 ④ ( RDD#filter 方法 - 过滤 RDD 中的元素 | RDD#distinct 方法 - 对 RDD 中的元素去重 )

    True , 则保留元素 ; func 函数返回 False , 则删除元素 ; new_rdd 是过滤的 RDD 对象 ; 2、RDD#filter 函数语法 RDD#filter 方法 语法 :..., 该布尔值的作用是表示该元素是否应该保留在新的 RDD 中 ; 返回 True 保留元素 ; 返回 False 删除元素 ; 3、代码示例 - RDD#filter 方法示例 下面代码中的核心代码是...True , 保留元素 ; 如果是 奇数 返回 False , 删除元素 ; 代码示例 : """ PySpark 数据处理 """ # 导入 PySpark 相关包 from pyspark import...print(even_numbers.collect()) # 停止 PySpark 程序 sc.stop() 执行结果 : Y:\002_WorkSpace\PycharmProjects\pythonProject...print(distinct_numbers.collect()) # 停止 PySpark 程序 sc.stop() 执行结果 : Y:\002_WorkSpace\PycharmProjects

    43610

    Pyspark处理数据中带有分隔符的数据集

    本篇文章目标是处理在数据集中存在分隔符或分隔符的特殊场景。对于Pyspark开发人员来说,处理这种类型的数据集有时是一件令人头疼的事情,但无论如何都必须处理它。...spark=SparkSession.builder.appName(‘delimit’).getOrCreate() 上面的命令帮助我们连接到spark环境,并让我们使用spark.read.csv...从文件中读取数据并将数据放入内存我们发现,最后一数据在哪里,年龄必须有一个整数数据类型,但是我们看到了一些其他的东西。这不是我们所期望的。一团糟,完全不匹配,不是吗?...接下来,连接“fname”和“lname”: from pyspark.sql.functions import concat, col, lit df1=df_new.withColumn(‘fullname...要验证数据转换,我们将把转换的数据集写入CSV文件,然后使用read. CSV()方法读取它。

    4K30

    【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 )

    , 该 被应用的函数 , 可以将每个元素转换为另一种类型 , 也可以针对 RDD 数据的 原始元素进行 指定操作 ; 计算完毕 , 会返回一个新的 RDD 对象 ; 2、RDD#map 语法 map...element * 10 # 应用 map 操作,将每个元素乘以 10 rdd2 = rdd.map(func) # 打印新的 RDD 中的内容 print(rdd2.collect()) # 停止...操作,将每个元素乘以 10 rdd2 = rdd.map(lambda element: element * 10) # 打印新的 RDD 中的内容 print(rdd2.collect()) # 停止...finished with exit code 0 6、代码示例 - RDD#map 数值计算 ( 链式调用 ) 在下面的代码中 , 先对 RDD 对象中的每个元素数据都乘以 10 , 然后再对计算的数据每个元素加上...element: element + 5)\ .map(lambda element: element / 2) # 打印新的 RDD 中的内容 print(rdd2.collect()) # 停止

    60510

    使用CDSW和运营数据库构建ML应用3:生产ML模型

    在HBase和HDFS中训练数据 这是训练数据的基本概述: 如您所见,共有7,其中5是传感器读数(温度,湿度比,湿度,CO2,光)。...合并两组训练数据,应用程序将通过PySpark加载整个训练表并将其传递给模型。 建立模型 现在我们有了所有训练数据,我们将建立并使用PySpark ML模型。...完成此操作,我们将使用HBase的训练数据对模型进行拟合。...该代码段最终为我返回了一个ML模型,其中给了我5组传感器输入,它将返回一个二进制数预测,其中1代表“已占用”,0代表“未占用” 创建和存储批次分数表 现在已经创建了一个简单的模型,我们需要对该模型进行评分...此Web应用程序基本上有两个目标。首先,通过实时流数据显示房间是否被占用。其次,添加一个功能,当用户确认占用预测正确时,将其添加到训练数据中。

    2.8K10

    Pyspark学习笔记(五)RDD的操作

    提示:写完文章,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言 一、PySpark RDD 转换操作 1.窄操作 2.宽操作 3.常见的转换操作表 二、pyspark 行动操作 三、...;带有参数numPartitions,默认值为None,可以对去重的数据重新分区 groupBy() 对元素进行分组。...otherRDD>) 执行的是内连接操作 leftOuterJoin() 返回左RDD中包含的所有元素或记录。...如果右RDD中的键在左RDD中存在,那么左RDD中匹配的记录会和右RDD记录一起返回。 fullOuterJoin() 无论是否有匹配的键,都会返回两个RDD中的所有元素。...intersection() 返回两个RDD中的共有元素,即两个集合相交的部分.返回的元素或者记录必须在两个集合中是一模一样的,即对于键值对RDD来说,键和值都要一样才行。

    4.3K20

    PySpark 通过Arrow加速

    第三个点是,Socket协议通讯其实还是很快的,而且不跨网络,只要能克服前面两个问题,那么性能就会得到很大的提升。...向量化指的是,首先Arrow是将数据按block进行传输的,其次是可以对立面的数据按进行处理的。这样就极大的加快了处理速度。...分组聚合使用Pandas处理 另外值得一提的是,PySpark是不支持自定义聚合函数的,现在如果是数据处理,可以把group by的小集合发给pandas处理,pandas再返回,比如 def trick7...()) / v.std())[["id"]] df.groupby("id").apply(normalize).show() 这里是id进行gourp by ,这样就得到一张id都是...1的小表,接着呢把这个小表转化为pandas dataframe处理,处理完成,还是返回一张小表,表结构则在注解里定义,比如只返回id字段,id字段是long类型。

    1.9K20

    Python大数据之PySpark(三)使用Python语言开发Spark程序代码

    _3.1.2\PySpark-SparkBase_3.1.2\data\output\wordsAdd") # 7-停止SparkContext sc.stop()#Shut down the...结果: [掌握-扩展阅读]远程PySpark环境配置 需求:需要将PyCharm连接服务器,同步本地写的代码到服务器上,使用服务器上的Python解析器执行 步骤: 1-准备PyCharm..._3.1.2\PySpark-SparkBase_3.1.2\data\output\wordsAdd") # 7-停止SparkContext sc.stop() # Shut down the..._3.1.2\PySpark-SparkBase_3.1.2\data\output\wordsAdd") > ># 7-停止SparkContext > >sc.stop() # Shut down...# 2)数据集,操作,返回值都放到了一起。 # 3)你在读代码的时候,没有了循环体,于是就可以少了些临时变量,以及变量倒来倒去逻辑。 # 4)你的代码变成了在描述你要干什么,而不是怎么去干。

    50420
    领券