I need to transform a DataFrame in which one of the columns consists of a list of tuples, each item in each of the tuples has to be a separate column.
Here is an example and a solution in Pandas:
import pandas as pd
df_dict = {
'a': {
"1": "stuff", "2": "stuff2"
},
"d": {
"1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
}
}
df = pd.DataFrame.from_dict(df_dict)
print(df) # intial structure
a d
1 stuff [(1, 2), (3, 4)]
2 stuff2 [(1, 2), (3, 4)]
# first transformation, let's separate each list item into a new row
row_breakdown = df.set_index(["a"])["d"].apply(pd.Series).stack()
print(row_breakdown)
a
stuff 0 (1, 2)
1 (3, 4)
stuff2 0 (1, 2)
1 (3, 4)
dtype: object
row_breakdown = row_breakdown.reset_index().drop(columns=["level_1"])
print(row_breakdown)
a 0
0 stuff (1, 2)
1 stuff (3, 4)
2 stuff2 (1, 2)
3 stuff2 (3, 4)
# second transformation, let's get each tuple item into a separate column
row_breakdown.columns = ["a", "d"]
row_breakdown = row_breakdown["d"].apply(pd.Series)
row_breakdown.columns = ["value_1", "value_2"]
print(row_breakdown)
value_1 value_2
0 1 2
1 3 4
2 1 2
3 3 4
This is the pandas solution. I need to be able to do the same but using PySpark (2.3). I have started working on it, but immediately got stuck:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
df_dict = {
'a': {
"1": "stuff", "2": "stuff2"
},
"d": {
"1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
}
}
df = pd.DataFrame(df_dict)
ddf = spark.createDataFrame(df)
row_breakdown = ddf.set_index(["a"])["d"].apply(pd.Series).stack()
AttributeError: 'DataFrame' object has no attribute 'set_index'
Apparently, Spark doesn't support indexing. Any pointers appreciated.
This might do:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F
import pandas as pd
conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
df_dict = {
'a': {
"1": "stuff", "2": "stuff2"
},
"d": {
"1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
}
}
df = pd.DataFrame(df_dict)
ddf = spark.createDataFrame(df)
exploded = ddf.withColumn('d', F.explode("d"))
exploded.show()
Result:
+------+------+
| a| d|
+------+------+
| stuff|[1, 2]|
| stuff|[3, 4]|
|stuff2|[1, 2]|
|stuff2|[3, 4]|
+------+------+
I feel more comfortable using SQL for this:
exploded.createOrReplaceTempView("exploded")
spark.sql("SELECT a, d._1 as value_1, d._2 as value_2 FROM exploded").show()
Important note: the reason why this is using the _1
and _2
accessors is because spark parsed the tuple as a structure and gave it default keys. If in your real implementation the dataframe contains an array<int>
, you should use the [0]
syntax.
The final result is:
+------+-------+-------+
| a|value_1|value_2|
+------+-------+-------+
| stuff| 1| 2|
| stuff| 3| 4|
|stuff2| 1| 2|
|stuff2| 3| 4|
+------+-------+-------+
Update
If you're starting from a DataFrame with the following schema:
ddf.printSchema()
#root
# |-- a: string (nullable = true)
# |-- d: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- _1: long (nullable = true)
# | | |-- _2: long (nullable = true)
You have to use pyspark.sql.functions.explode
to explode the array into columns, but after that you can use the *
selector to turn the struct into columns:
from pyspark.sql.functions import explode
row_breakdown = ddf.select("a", explode("d").alias("d")).select("a", "d.*")
row_breakdown.show()
#+------+---+---+
#| a| _1| _2|
#+------+---+---+
#| stuff| 1| 2|
#| stuff| 3| 4|
#|stuff2| 1| 2|
#|stuff2| 3| 4|
#+------+---+---+
And to rename the columns, you can use a list comprehension with str.replace
:
from pyspark.sql.functions import col
row_breakdown = row_breakdown.select(
*[col(c).alias(c.replace("_", "value")) for c in row_breakdown.columns]
)
row_breakdown.show()
#+------+------+------+
#| a|value1|value2|
#+------+------+------+
#| stuff| 1| 2|
#| stuff| 3| 4|
#|stuff2| 1| 2|
#|stuff2| 3| 4|
#+------+------+------+
Original Answer
If you're starting from the dictionary, you don't need to use pandas
at all for this.
Instead, you can create your DataFrame directly from your dictionary. The key is to transform your dictionary into the appropriate format, and then use that to build your Spark DataFrame.
In your example, it seems like you are not using the values under the a
key at all.
As I mentioned in my comment, you can achieve the described output with the following code:
df_dict = {
'a': {
"1": "stuff", "2": "stuff2"
},
"d": {
"1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
}
}
from itertools import chain
row_breakdown = spark.createDataFrame(
chain.from_iterable(df_dict["d"].values()), ["value1", "value2"]
)
row_breakdown.show()
#+------+------+
#|value1|value2|
#+------+------+
#| 1| 2|
#| 3| 4|
#| 1| 2|
#| 3| 4|
#+------+------+
If you want an index-like column, you can achieve that by simply using enumerate
, as in the following example. Here I am also sorting the values by the key, as that seems to be your intention.
data = (
(i,) + v for i, v in enumerate(
chain.from_iterable(
v for k, v in sorted(df_dict["d"].items(), key=lambda (key, val): key)
)
)
)
columns = ["index", "value1", "value2"]
row_breakdown = spark.createDataFrame(data, columns)
row_breakdown.show()
#+-----+------+------+
#|index|value1|value2|
#+-----+------+------+
#| 0| 1| 2|
#| 1| 3| 4|
#| 2| 1| 2|
#| 3| 3| 4|
#+-----+------+------+
As you can see here, we can pass a generator expression to spark.createDataFrame
, and this solution does not require us to know the length of the tuples ahead of time.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With