Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pandas to PySpark: transforming a column of lists of tuples to separate columns for each tuple item

I need to transform a DataFrame in which one of the columns consists of a list of tuples, each item in each of the tuples has to be a separate column.

Here is an example and a solution in Pandas:

import pandas as pd

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame.from_dict(df_dict)
print(df)  # intial structure

           a    d
    1   stuff   [(1, 2), (3, 4)]
    2   stuff2  [(1, 2), (3, 4)]

# first transformation, let's separate each list item into a new row
row_breakdown = df.set_index(["a"])["d"].apply(pd.Series).stack()
print(row_breakdown)

            a        
    stuff   0    (1, 2)
            1    (3, 4)
    stuff2  0    (1, 2)
            1    (3, 4)
    dtype: object

row_breakdown = row_breakdown.reset_index().drop(columns=["level_1"])
print(row_breakdown)

    a   0
    0   stuff   (1, 2)
    1   stuff   (3, 4)
    2   stuff2  (1, 2)
    3   stuff2  (3, 4)

# second transformation, let's get each tuple item into a separate column
row_breakdown.columns = ["a", "d"]
row_breakdown = row_breakdown["d"].apply(pd.Series)
row_breakdown.columns = ["value_1", "value_2"]
print(row_breakdown)

        value_1 value_2
    0   1   2
    1   3   4
    2   1   2
    3   3   4

This is the pandas solution. I need to be able to do the same but using PySpark (2.3). I have started working on it, but immediately got stuck:

from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession

conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

spark = SparkSession(sc)

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame(df_dict)
ddf = spark.createDataFrame(df)

row_breakdown = ddf.set_index(["a"])["d"].apply(pd.Series).stack()

    AttributeError: 'DataFrame' object has no attribute 'set_index'

Apparently, Spark doesn't support indexing. Any pointers appreciated.

like image 961
Ivan Bilan Avatar asked Oct 17 '22 13:10

Ivan Bilan


2 Answers

This might do:

from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F
import pandas as pd

conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

spark = SparkSession(sc)

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame(df_dict)
ddf = spark.createDataFrame(df)


exploded = ddf.withColumn('d', F.explode("d"))
exploded.show()

Result:

+------+------+
|     a|     d|
+------+------+
| stuff|[1, 2]|
| stuff|[3, 4]|
|stuff2|[1, 2]|
|stuff2|[3, 4]|
+------+------+

I feel more comfortable using SQL for this:

exploded.createOrReplaceTempView("exploded")
spark.sql("SELECT a, d._1 as value_1, d._2 as value_2 FROM exploded").show()

Important note: the reason why this is using the _1 and _2 accessors is because spark parsed the tuple as a structure and gave it default keys. If in your real implementation the dataframe contains an array<int>, you should use the [0] syntax.

The final result is:

+------+-------+-------+
|     a|value_1|value_2|
+------+-------+-------+
| stuff|      1|      2|
| stuff|      3|      4|
|stuff2|      1|      2|
|stuff2|      3|      4|
+------+-------+-------+
like image 193
martinarroyo Avatar answered Oct 21 '22 06:10

martinarroyo


Update

If you're starting from a DataFrame with the following schema:

ddf.printSchema()
#root
# |-- a: string (nullable = true)
# |-- d: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- _1: long (nullable = true)
# |    |    |-- _2: long (nullable = true)

You have to use pyspark.sql.functions.explode to explode the array into columns, but after that you can use the * selector to turn the struct into columns:

from pyspark.sql.functions import explode

row_breakdown = ddf.select("a", explode("d").alias("d")).select("a", "d.*")
row_breakdown.show()
#+------+---+---+
#|     a| _1| _2|
#+------+---+---+
#| stuff|  1|  2|
#| stuff|  3|  4|
#|stuff2|  1|  2|
#|stuff2|  3|  4|
#+------+---+---+

And to rename the columns, you can use a list comprehension with str.replace:

from pyspark.sql.functions import col

row_breakdown = row_breakdown.select(
    *[col(c).alias(c.replace("_", "value")) for c in row_breakdown.columns]
)
row_breakdown.show()
#+------+------+------+
#|     a|value1|value2|
#+------+------+------+
#| stuff|     1|     2|
#| stuff|     3|     4|
#|stuff2|     1|     2|
#|stuff2|     3|     4|
#+------+------+------+

Original Answer

If you're starting from the dictionary, you don't need to use pandas at all for this.

Instead, you can create your DataFrame directly from your dictionary. The key is to transform your dictionary into the appropriate format, and then use that to build your Spark DataFrame.

In your example, it seems like you are not using the values under the a key at all.

As I mentioned in my comment, you can achieve the described output with the following code:

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

from itertools import chain
row_breakdown = spark.createDataFrame(
    chain.from_iterable(df_dict["d"].values()), ["value1", "value2"]
)
row_breakdown.show()
#+------+------+
#|value1|value2|
#+------+------+
#|     1|     2|
#|     3|     4|
#|     1|     2|
#|     3|     4|
#+------+------+

If you want an index-like column, you can achieve that by simply using enumerate, as in the following example. Here I am also sorting the values by the key, as that seems to be your intention.

data = (
    (i,) + v for i, v in enumerate(
        chain.from_iterable(
            v for k, v in sorted(df_dict["d"].items(), key=lambda (key, val): key)
        )
    )
)
columns = ["index", "value1", "value2"]
row_breakdown = spark.createDataFrame(data, columns)
row_breakdown.show()
#+-----+------+------+
#|index|value1|value2|
#+-----+------+------+
#|    0|     1|     2|
#|    1|     3|     4|
#|    2|     1|     2|
#|    3|     3|     4|
#+-----+------+------+

As you can see here, we can pass a generator expression to spark.createDataFrame, and this solution does not require us to know the length of the tuples ahead of time.

like image 31
pault Avatar answered Oct 21 '22 08:10

pault