Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using pyspark, how to expand a column containing a variable map to new columns in a DataFrame while keeping other columns?

I have the following DataFrame

+-----+--------------------------------------------------+---+
|asset|signals                                           |ts |
+-----+--------------------------------------------------+---+
|2    |[D -> 1100, F -> 3000]                            |6  |
|1    |[D -> 500, System.Date -> 340]                    |5  |
|1    |[B -> 100, E -> 900, System.Date -> 310]          |4  |
|1    |[B -> 110, C -> 200, System.Date -> 320]          |3  |
|1    |[A -> 330, B -> 120, C -> 210, D -> 410, E -> 100]|2  |
+-----+--------------------------------------------------+---+

I need to project the column:'signals' with key-values to multiple columns as follows:

+-----+---+-----------+----+----+----+----+----+----+
|asset|ts |System.Date|F   |E   |B   |D   |C   |A   |
+-----+---+-----------+----+----+----+----+----+----+
|2    |6  |null       |3000|null|null|1100|null|null|
|1    |5  |340        |null|null|null|500 |null|null|
|1    |4  |310        |null|900 |100 |null|null|null|
|1    |3  |320        |null|null|110 |null|200 |null|
|1    |2  |null       |null|100 |120 |410 |210 |330 |
+-----+---+-----------+----+----+----+----+----+----+

So here is the example:

d = [{'asset': '2', 'ts': 6, 'signals':{'F': '3000','D':'1100'}}, 
     {'asset': '1', 'ts': 5, 'signals':{'System.Date': '340','D':'500'}}, 
     {'asset': '1', 'ts': 4, 'signals':{'System.Date': '310', 'B': '100', 'E':'900'}}, 
     {'asset': '1', 'ts': 3, 'signals':{'System.Date': '320', 'B': '110','C':'200'}},
      {'asset': '1', 'ts': 2, 'signals':{'A': '330', 'B': '120','C':'210','D':'410','E':'100'}}]
df = spark.createDataFrame(d)

I can extract all the possible keys and achieve my objective as follows:

from pyspark.sql import functions as spfn
# the following takes too long (WANT-TO-AVOID)
all_signals = (df
    .select(spfn.explode("signals"))
    .select("key")
    .distinct()
    .rdd.flatMap(lambda x: x)
    .collect())
print(all_signals)
exprs = [spfn.col("signals").getItem(k).alias(k) for k in all_signals]

df1 = df.select(spfn.col('*'),*exprs).drop('signals')

df1.show(truncate=False)

['System.Date', 'F', 'E', 'B', 'D', 'C', 'A']
+-----+---+-----------+----+----+----+----+----+----+
|asset|ts |System.Date|F   |E   |B   |D   |C   |A   |
+-----+---+-----------+----+----+----+----+----+----+
|2    |6  |null       |3000|null|null|1100|null|null|
|1    |5  |340        |null|null|null|500 |null|null|
|1    |4  |310        |null|900 |100 |null|null|null|
|1    |3  |320        |null|null|110 |null|200 |null|
|1    |2  |null       |null|100 |120 |410 |210 |330 |
+-----+---+-----------+----+----+----+----+----+----+


but I was wondering if there is a way to use the following but don't know how to keep existing columns:

df2 = spark.read.json(df.rdd.map(lambda r: r.signals))
df2.show(truncate=False)
+----+----+----+----+----+----+-----------+
|A   |B   |C   |D   |E   |F   |System.Date|
+----+----+----+----+----+----+-----------+
|null|null|null|1100|null|1000|null       |
|null|null|null|500 |null|null|340        |
|null|100 |null|null|900 |null|310        |
|null|110 |200 |null|null|null|320        |
|330 |120 |210 |410 |100 |null|null       |
+----+----+----+----+----+----+-----------+

The step above (labeled WANT-TO-AVOID) to get all keys takes a long time while the "spark.read.json" seems much faster. So again, is there an easier and faster way to expand the map column?

like image 685
ziad.rida Avatar asked Jul 20 '20 00:07

ziad.rida


2 Answers

You could do this with read.json to get your desired columns. (spark2.4+).

from pyspark.sql import functions as F

df1=df.withColumn("signals", F.map_concat("signals", F.create_map(F.lit("asset"),"asset",F.lit("ts"),"ts")))

df2 = spark.read.json(df1.rdd.map(lambda r: r.signals))

df2.show()

#+----+----+----+----+----+----+-----------+-----+---+
#|   A|   B|   C|   D|   E|   F|System.Date|asset| ts|
#+----+----+----+----+----+----+-----------+-----+---+
#|null|null|null|1100|null|3000|       null|    2|  6|
#|null|null|null| 500|null|null|        340|    1|  5|
#|null| 100|null|null| 900|null|        310|    1|  4|
#|null| 110| 200|null|null|null|        320|    1|  3|
#| 330| 120| 210| 410| 100|null|       null|    1|  2|
#+----+----+----+----+----+----+-----------+-----+---+
like image 51
murtihash Avatar answered Sep 27 '22 18:09

murtihash


Your implementation of getting the unique key values was slow, but this should be faster:

keys_df = df.select(F.explode(F.map_keys(F.col("signals")))).distinct()
keys = list(map(lambda row: row[0], keys_df.collect()))
key_cols = list(map(lambda f: F.col("signals").getItem(f).alias(str(f)), keys))
final_cols = [F.col("asset"), F.col("ts")] + key_cols
df.select(final_cols).show()
+-----+---+-----------+----+----+----+----+----+----+
|asset| ts|System.Date|   F|   E|   B|   D|   C|   A|
+-----+---+-----------+----+----+----+----+----+----+
|    2|  6|       null|3000|null|null|1100|null|null|
|    1|  5|        340|null|null|null| 500|null|null|
|    1|  4|        310|null| 900| 100|null|null|null|
|    1|  3|        320|null|null| 110|null| 200|null|
|    1|  2|       null|null| 100| 120| 410| 210| 330|
+-----+---+-----------+----+----+----+----+----+----+
like image 41
Powers Avatar answered Sep 27 '22 19:09

Powers