我是pyspark的新手,我正在尝试创建一个简单的udf,它必须接受两个输入列,检查第二列是否有空格,如果有,将第一列拆分为两个值并覆盖原始列。这就是我所做的:
def split(x, y):
if x == "EXDRA" and y == "":
return ("EXT", "DCHA")
if x == "EXIZQ" and y == "":
return ("EXT", "IZDA")
udf_split = udf(split, ArrayType())
df = df \
.withColumn("x", udf_split(df['x'], df['y'])[1]) \
.withColumn("y", udf_split(df['x'], df['y'])[0])
但是当我运行这段代码时,我得到了以下错误:
File "<stdin>", line 1, in <module>
TypeError: __init__() takes at least 2 arguments (1 given)
我做错了什么?
谢谢你,阿尔瓦罗
发布于 2017-07-11 09:17:03
我不确定你想要做什么,但根据我的理解,我会这样做:
from pyspark.sql.types import *
from pyspark.sql.functions import udf, col
def split(x, y):
if x == "EXDRA" and y == "":
return ("EXT", "DCHA")
if x == "EXIZQ" and y == "":
return ("EXT", "IZDA")
schema = StructType([StructField("x1", StringType(), False), StructField("y1", StringType(), False)])
udf_split = udf(split, schema)
df = spark.createDataFrame([("EXDRA", ""), ("EXIZQ", ""), ("", "foo")], ("x", "y"))
df.show()
# +-----+---+
# | x| y|
# +-----+---+
# |EXDRA| |
# |EXIZQ| |
# | |foo|
# +-----+---+
df = df \
.withColumn("split", udf_split(df['x'], df['y'])) \
.withColumn("x", col("split.x1")) \
.withColumn("y", col("split.y1"))
df.printSchema()
# root
# |-- x: string (nullable = true)
# |-- y: string (nullable = true)
# |-- split: struct (nullable = true)
# | |-- x1: string (nullable = false)
# | |-- y1: string (nullable = false)
df.show()
# +----+----+----------+
# | x| y| split|
# +----+----+----------+
# | EXT|DCHA|[EXT,DCHA]|
# | EXT|IZDA|[EXT,IZDA]|
# |null|null| null|
# +----+----+----------+
发布于 2019-10-16 21:09:56
我想你必须将你的udf定义为:
udf_split = udf(split, ArrayType(StringType()))
https://stackoverflow.com/questions/45029113
复制相似问题