Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark: Add a column to DataFrame when column is a list

I have read similar questions but couldn't find a solution to my specific problem.

I have a list

l = [1, 2, 3]

and a DataFrame

df = sc.parallelize([
    ['p1', 'a'],
    ['p2', 'b'],
    ['p3', 'c'],
]).toDF(('product', 'name'))

I would like to obtain a new DataFrame where the list l is added as a further column, namely

+-------+----+---------+
|product|name| new_col |
+-------+----+---------+
|     p1|   a|     1   |
|     p2|   b|     2   |
|     p3|   c|     3   |
+-------+----+---------+

Approaches with JOIN, where I was joining df with an

 sc.parallelize([[1], [2], [3]])

have failed. Approaches using withColumn, as in

new_df = df.withColumn('new_col', l)

have failed because the list is not a Column object.

like image 544
mar tin Avatar asked Mar 21 '16 13:03

mar tin


People also ask

How do I add a list of columns to a DataFrame in PySpark?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .

How do I convert a list to a DataFrame in PySpark?

To do this first create a list of data and a list of column names. Then pass this zipped data to spark. createDataFrame() method. This method is used to create DataFrame.

How do you get the columns list in PySpark?

You can find all column names & data types (DataType) of PySpark DataFrame by using df. dtypes and df. schema and you can also retrieve the data type of a specific column name using df. schema["name"].

How do I add values to an existing column in PySpark?

You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.


1 Answers

So, from reading some interesting stuff here, I've ascertained that you can't really just append a random / arbitrary column to a given DataFrame object. It appears what you want is more of a zip than a join. I looked around and found this ticket, which makes me think you won't be able to zip given that you have DataFrame rather than RDD objects.

The only way I've been able to solve your issue invovles leaving the world of DataFrame objects and returning to RDD objects. I've also needed to create an index for the purpose of the join, which may or may not work with your use case.

l = sc.parallelize([1, 2, 3])
index = sc.parallelize(range(0, l.count()))
z = index.zip(l)

rdd = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']])
rdd_index = index.zip(rdd)

# just in case!
assert(rdd.count() == l.count())
# perform an inner join on the index we generated above, then map it to look pretty.
new_rdd = rdd_index.join(z).map(lambda (x, y): [y[0][0], y[0][1], y[1]])
new_df = new_rdd.toDF(["product", 'name', 'new_col'])

When I run new_df.show(), I get:

+-------+----+-------+
|product|name|new_col|
+-------+----+-------+
|     p1|   a|      1|
|     p2|   b|      2|
|     p3|   c|      3|
+-------+----+-------+

Sidenote: I'm really surprised this didn't work. Looks like an outer join?

from pyspark.sql import Row
l = sc.parallelize([1, 2, 3])
new_row = Row("new_col_name")
l_as_df = l.map(new_row).toDF()
new_df = df.join(l_as_df)

When I run new_df.show(), I get:

+-------+----+------------+
|product|name|new_col_name|
+-------+----+------------+
|     p1|   a|           1|
|     p1|   a|           2|
|     p1|   a|           3|
|     p2|   b|           1|
|     p3|   c|           1|
|     p2|   b|           2|
|     p2|   b|           3|
|     p3|   c|           2|
|     p3|   c|           3|
+-------+----+------------+
like image 166
Katya Willard Avatar answered Nov 15 '22 12:11

Katya Willard