Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to split a list to multiple columns in Pyspark?

I have:

key   value
a    [1,2,3]
b    [2,3,4]

I want:

key value1 value2 value3
a     1      2      3
b     2      3      4

It seems that in scala I can write:df.select($"value._1", $"value._2", $"value._3"), but it is not possible in python.

So is there a good way to do this?

like image 488
DarkZero Avatar asked Aug 21 '17 04:08

DarkZero


People also ask

How do I split one column into multiple columns in spark?

pyspark. sql. functions provide a function split() which is used to split DataFrame string Column into multiple columns.

How do you split data in Pyspark?

The PySpark SQL provides the split() function to convert delimiter separated String to an Array (StringType to ArrayType) column on DataFrame It can be done by splitting the string column on the delimiter like space, comma, pipe, etc. and converting it into ArrayType.

How do I split a single column into multiple columns in Python?

We can use str. split() to split one column to multiple columns by specifying expand=True option. We can use str. extract() to exract multiple columns using regex expression in which multiple capturing groups are defined.

How to split Dataframe string column into multiple columns in pyspark?

PySpark pyspark.sql.functions provides a function split () to split DataFrame string Column into multiple columns. In this tutorial, you will learn how to split Dataframe single column into multiple columns using withColumn () and select () and also will explain how to use regular expression (regex) on split function.

What is the default limit value of pyspark split function?

If not provided, the default limit value is -1. Before we start with an example of Pyspark split function, first let’s create a DataFrame and will use one of the column from this DataFrame to split into multiple columns. Output is shown below for the above code.

What is column to list in pyspark?

Introduction to PySpark Column to List PYSPARK COLUMN TO LIST is an operation that is used for the conversion of the columns of PySpark into List. The data frame of a PySpark consists of columns that hold out the data on a Data Frame. The PySpark to List provides the methods and the ways to convert these column elements to List.

How to split a column with two doubles in Python?

To split a column with doubles stored in DenseVector format, e.g. a DataFrame that looks like, one have to construct a UDF that does the convertion of DenseVector to array (python list) first:


4 Answers

It depends on the type of your "list":

  • If it is of type ArrayType():

    df = hc.createDataFrame(sc.parallelize([['a', [1,2,3]], ['b', [2,3,4]]]), ["key", "value"]) df.printSchema() df.show() root  |-- key: string (nullable = true)  |-- value: array (nullable = true)  |    |-- element: long (containsNull = true) 

    you can access the values like you would with python using []:

    df.select("key", df.value[0], df.value[1], df.value[2]).show() +---+--------+--------+--------+ |key|value[0]|value[1]|value[2]| +---+--------+--------+--------+ |  a|       1|       2|       3| |  b|       2|       3|       4| +---+--------+--------+--------+  +---+-------+ |key|  value| +---+-------+ |  a|[1,2,3]| |  b|[2,3,4]| +---+-------+ 
  • If it is of type StructType(): (maybe you built your dataframe by reading a JSON)

    df2 = df.select("key", psf.struct(         df.value[0].alias("value1"),          df.value[1].alias("value2"),          df.value[2].alias("value3")     ).alias("value")) df2.printSchema() df2.show() root  |-- key: string (nullable = true)  |-- value: struct (nullable = false)  |    |-- value1: long (nullable = true)  |    |-- value2: long (nullable = true)  |    |-- value3: long (nullable = true)  +---+-------+ |key|  value| +---+-------+ |  a|[1,2,3]| |  b|[2,3,4]| +---+-------+ 

    you can directly 'split' the column using *:

    df2.select('key', 'value.*').show() +---+------+------+------+ |key|value1|value2|value3| +---+------+------+------+ |  a|     1|     2|     3| |  b|     2|     3|     4| +---+------+------+------+ 
like image 132
MaFF Avatar answered Oct 18 '22 09:10

MaFF


I'd like to add the case of sized lists (arrays) to pault answer.

In the case that our column contains medium sized arrays (or large sized ones) it is still possible to split them in columns.

from pyspark.sql.types import *          # Needed to define DataFrame Schema. from pyspark.sql.functions import expr     # Define schema to create DataFrame with an array typed column. mySchema = StructType([StructField("V1", StringType(), True),                        StructField("V2", ArrayType(IntegerType(),True))])  df = spark.createDataFrame([['A', [1, 2, 3, 4, 5, 6, 7]],                              ['B', [8, 7, 6, 5, 4, 3, 2]]], schema= mySchema)  # Split list into columns using 'expr()' in a comprehension list. arr_size = 7 df = df.select(['V1', 'V2']+[expr('V2[' + str(x) + ']') for x in range(0, arr_size)])  # It is posible to define new column names. new_colnames = ['V1', 'V2'] + ['val_' + str(i) for i in range(0, arr_size)]  df = df.toDF(*new_colnames) 

The result is:

df.show(truncate= False)  +---+---------------------+-----+-----+-----+-----+-----+-----+-----+ |V1 |V2                   |val_0|val_1|val_2|val_3|val_4|val_5|val_6| +---+---------------------+-----+-----+-----+-----+-----+-----+-----+ |A  |[1, 2, 3, 4, 5, 6, 7]|1    |2    |3    |4    |5    |6    |7    | |B  |[8, 7, 6, 5, 4, 3, 2]|8    |7    |6    |5    |4    |3    |2    | +---+---------------------+-----+-----+-----+-----+-----+-----+-----+ 
like image 23
Jordi Aceiton Avatar answered Oct 18 '22 09:10

Jordi Aceiton


I needed to unlist a 712 dimensional array into columns in order to write it to csv. I used @MaFF's solution first for my problem but that seemed to cause a lot of errors and additional computation time. I am not sure what was causing it, but I used a different method which reduced the computation time considerably (22 minutes compared to more than 4 hours)!

Method by @MaFF's:

length = len(dataset.head()["list_col"])
dataset = dataset.select(dataset.columns + [dataset["list_col"][k] for k in range(length)])

What I used:

dataset = dataset.rdd.map(lambda x: (*x, *x["list_col"])).toDF()

If someone has any ideas what was causing this difference in computational time, please let me know! I suspect that in my case the bottleneck was with calling head() to get the list length (which I would like be be adaptive). And because (i) my data pipeline was quite long and exhaustive, and (ii) I had to unlist multiple columns. Furthermore caching the entire dataset was not an option.

like image 24
thijsvdp Avatar answered Oct 18 '22 09:10

thijsvdp


For arraytype data, to do it dynamically, you can do something like

df2.select(['key'] + [df2.features[x] for x in range(0,3)])
like image 33
VarunKumar Avatar answered Oct 18 '22 09:10

VarunKumar