I have read similar questions but couldn't find a solution to my specific problem.
I have a list
l = [1, 2, 3]
and a DataFrame
df = sc.parallelize([
['p1', 'a'],
['p2', 'b'],
['p3', 'c'],
]).toDF(('product', 'name'))
I would like to obtain a new DataFrame where the list l
is added as a further column, namely
+-------+----+---------+
|product|name| new_col |
+-------+----+---------+
| p1| a| 1 |
| p2| b| 2 |
| p3| c| 3 |
+-------+----+---------+
Approaches with JOIN, where I was joining df with an
sc.parallelize([[1], [2], [3]])
have failed. Approaches using withColumn
, as in
new_df = df.withColumn('new_col', l)
have failed because the list is not a Column
object.
In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .
To do this first create a list of data and a list of column names. Then pass this zipped data to spark. createDataFrame() method. This method is used to create DataFrame.
You can find all column names & data types (DataType) of PySpark DataFrame by using df. dtypes and df. schema and you can also retrieve the data type of a specific column name using df. schema["name"].
You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.
So, from reading some interesting stuff here, I've ascertained that you can't really just append a random / arbitrary column to a given DataFrame
object. It appears what you want is more of a zip
than a join
. I looked around and found this ticket, which makes me think you won't be able to zip
given that you have DataFrame
rather than RDD
objects.
The only way I've been able to solve your issue invovles leaving the world of DataFrame
objects and returning to RDD
objects. I've also needed to create an index for the purpose of the join, which may or may not work with your use case.
l = sc.parallelize([1, 2, 3])
index = sc.parallelize(range(0, l.count()))
z = index.zip(l)
rdd = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']])
rdd_index = index.zip(rdd)
# just in case!
assert(rdd.count() == l.count())
# perform an inner join on the index we generated above, then map it to look pretty.
new_rdd = rdd_index.join(z).map(lambda (x, y): [y[0][0], y[0][1], y[1]])
new_df = new_rdd.toDF(["product", 'name', 'new_col'])
When I run new_df.show()
, I get:
+-------+----+-------+
|product|name|new_col|
+-------+----+-------+
| p1| a| 1|
| p2| b| 2|
| p3| c| 3|
+-------+----+-------+
Sidenote: I'm really surprised this didn't work. Looks like an outer join?
from pyspark.sql import Row
l = sc.parallelize([1, 2, 3])
new_row = Row("new_col_name")
l_as_df = l.map(new_row).toDF()
new_df = df.join(l_as_df)
When I run new_df.show()
, I get:
+-------+----+------------+
|product|name|new_col_name|
+-------+----+------------+
| p1| a| 1|
| p1| a| 2|
| p1| a| 3|
| p2| b| 1|
| p3| c| 1|
| p2| b| 2|
| p2| b| 3|
| p3| c| 2|
| p3| c| 3|
+-------+----+------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With