I'm using pyspark, loading a large csv file into a dataframe with spark-csv, and as a pre-processing step I need to apply a variety of operations to the data available in one of the columns (that contains a json string). That will return X values, each of which needs to be stored in their own separate column.
That functionality will be implemented in a UDF. However, I am not sure how to return a list of values from that UDF and feed these into individual columns. Below is a simple example:
(...) from pyspark.sql.functions import udf def udf_test(n): return [n/2, n%2] test_udf=udf(udf_test) df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)
That produces the following:
+------+----------+--------------------+ |amount|trans_date| test| +------+----------+--------------------+ | 28.0|2016-02-07| [14.0, 0.0]| | 31.01|2016-02-07|[15.5050001144409...| | 13.41|2016-02-04|[6.70499992370605...| | 307.7|2015-02-17|[153.850006103515...| | 22.09|2016-02-05|[11.0450000762939...| +------+----------+--------------------+ only showing top 5 rows
What would be the best way to store the two (in this example) values being returned by the udf on separate columns? Right now they are being typed as strings:
df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema() root |-- amount: float (nullable = true) |-- trans_date: string (nullable = true) |-- test: string (nullable = true)
Struct method I have returned Tuple2 for testing purpose (higher order tuples can be used according to how many multiple columns are required) from udf function and it would be treated as struct column. Then you can use . * to select all the elements in separate columns and finally rename them.
It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .
A UDF can take many parameters i.e. many columns but it should return one result i.e. one column. In order to doing so, just add parameters to your stringToBinary function and it's done. if it takes in an array def stringToBinary(stringValues: Array[String]) what will stringValues[0] represent?
It is not possible to create multiple top level columns from a single UDF call but you can create a new struct
. It requires an UDF with specified returnType
:
from pyspark.sql.functions import udf from pyspark.sql.types import StructType, StructField, FloatType schema = StructType([ StructField("foo", FloatType(), False), StructField("bar", FloatType(), False) ]) def udf_test(n): return (n / 2, n % 2) if n and n != 0.0 else (float('nan'), float('nan')) test_udf = udf(udf_test, schema) df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"]) foobars = df.select(test_udf("y").alias("foobar")) foobars.printSchema() ## root ## |-- foobar: struct (nullable = true) ## | |-- foo: float (nullable = false) ## | |-- bar: float (nullable = false)
You further flatten the schema with simple select
:
foobars.select("foobar.foo", "foobar.bar").show() ## +---+---+ ## |foo|bar| ## +---+---+ ## |1.0|0.0| ## |1.5|1.0| ## +---+---+
See also Derive multiple columns from a single column in a Spark DataFrame
you can use flatMap to get the column the desired dataframe in one go
df=df.withColumn('udf_results',udf) df4=df.select('udf_results').rdd.flatMap(lambda x:x).toDF(schema=your_new_schema)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With