I have:
key value
a [1,2,3]
b [2,3,4]
I want:
key value1 value2 value3
a 1 2 3
b 2 3 4
It seems that in scala I can write:df.select($"value._1", $"value._2", $"value._3")
, but it is not possible in python.
So is there a good way to do this?
pyspark. sql. functions provide a function split() which is used to split DataFrame string Column into multiple columns.
The PySpark SQL provides the split() function to convert delimiter separated String to an Array (StringType to ArrayType) column on DataFrame It can be done by splitting the string column on the delimiter like space, comma, pipe, etc. and converting it into ArrayType.
We can use str. split() to split one column to multiple columns by specifying expand=True option. We can use str. extract() to exract multiple columns using regex expression in which multiple capturing groups are defined.
PySpark pyspark.sql.functions provides a function split () to split DataFrame string Column into multiple columns. In this tutorial, you will learn how to split Dataframe single column into multiple columns using withColumn () and select () and also will explain how to use regular expression (regex) on split function.
If not provided, the default limit value is -1. Before we start with an example of Pyspark split function, first let’s create a DataFrame and will use one of the column from this DataFrame to split into multiple columns. Output is shown below for the above code.
Introduction to PySpark Column to List PYSPARK COLUMN TO LIST is an operation that is used for the conversion of the columns of PySpark into List. The data frame of a PySpark consists of columns that hold out the data on a Data Frame. The PySpark to List provides the methods and the ways to convert these column elements to List.
To split a column with doubles stored in DenseVector format, e.g. a DataFrame that looks like, one have to construct a UDF that does the convertion of DenseVector to array (python list) first:
It depends on the type of your "list":
If it is of type ArrayType()
:
df = hc.createDataFrame(sc.parallelize([['a', [1,2,3]], ['b', [2,3,4]]]), ["key", "value"]) df.printSchema() df.show() root |-- key: string (nullable = true) |-- value: array (nullable = true) | |-- element: long (containsNull = true)
you can access the values like you would with python using []
:
df.select("key", df.value[0], df.value[1], df.value[2]).show() +---+--------+--------+--------+ |key|value[0]|value[1]|value[2]| +---+--------+--------+--------+ | a| 1| 2| 3| | b| 2| 3| 4| +---+--------+--------+--------+ +---+-------+ |key| value| +---+-------+ | a|[1,2,3]| | b|[2,3,4]| +---+-------+
If it is of type StructType()
: (maybe you built your dataframe by reading a JSON)
df2 = df.select("key", psf.struct( df.value[0].alias("value1"), df.value[1].alias("value2"), df.value[2].alias("value3") ).alias("value")) df2.printSchema() df2.show() root |-- key: string (nullable = true) |-- value: struct (nullable = false) | |-- value1: long (nullable = true) | |-- value2: long (nullable = true) | |-- value3: long (nullable = true) +---+-------+ |key| value| +---+-------+ | a|[1,2,3]| | b|[2,3,4]| +---+-------+
you can directly 'split' the column using *
:
df2.select('key', 'value.*').show() +---+------+------+------+ |key|value1|value2|value3| +---+------+------+------+ | a| 1| 2| 3| | b| 2| 3| 4| +---+------+------+------+
I'd like to add the case of sized lists (arrays) to pault answer.
In the case that our column contains medium sized arrays (or large sized ones) it is still possible to split them in columns.
from pyspark.sql.types import * # Needed to define DataFrame Schema. from pyspark.sql.functions import expr # Define schema to create DataFrame with an array typed column. mySchema = StructType([StructField("V1", StringType(), True), StructField("V2", ArrayType(IntegerType(),True))]) df = spark.createDataFrame([['A', [1, 2, 3, 4, 5, 6, 7]], ['B', [8, 7, 6, 5, 4, 3, 2]]], schema= mySchema) # Split list into columns using 'expr()' in a comprehension list. arr_size = 7 df = df.select(['V1', 'V2']+[expr('V2[' + str(x) + ']') for x in range(0, arr_size)]) # It is posible to define new column names. new_colnames = ['V1', 'V2'] + ['val_' + str(i) for i in range(0, arr_size)] df = df.toDF(*new_colnames)
The result is:
df.show(truncate= False) +---+---------------------+-----+-----+-----+-----+-----+-----+-----+ |V1 |V2 |val_0|val_1|val_2|val_3|val_4|val_5|val_6| +---+---------------------+-----+-----+-----+-----+-----+-----+-----+ |A |[1, 2, 3, 4, 5, 6, 7]|1 |2 |3 |4 |5 |6 |7 | |B |[8, 7, 6, 5, 4, 3, 2]|8 |7 |6 |5 |4 |3 |2 | +---+---------------------+-----+-----+-----+-----+-----+-----+-----+
I needed to unlist a 712 dimensional array into columns in order to write it to csv. I used @MaFF's solution first for my problem but that seemed to cause a lot of errors and additional computation time. I am not sure what was causing it, but I used a different method which reduced the computation time considerably (22 minutes compared to more than 4 hours)!
Method by @MaFF's:
length = len(dataset.head()["list_col"])
dataset = dataset.select(dataset.columns + [dataset["list_col"][k] for k in range(length)])
What I used:
dataset = dataset.rdd.map(lambda x: (*x, *x["list_col"])).toDF()
If someone has any ideas what was causing this difference in computational time, please let me know! I suspect that in my case the bottleneck was with calling head()
to get the list length (which I would like be be adaptive). And because (i) my data pipeline was quite long and exhaustive, and (ii) I had to unlist multiple columns. Furthermore caching the entire dataset was not an option.
For arraytype data, to do it dynamically, you can do something like
df2.select(['key'] + [df2.features[x] for x in range(0,3)])
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With