I have the following DataFrame
+-----+--------------------------------------------------+---+
|asset|signals |ts |
+-----+--------------------------------------------------+---+
|2 |[D -> 1100, F -> 3000] |6 |
|1 |[D -> 500, System.Date -> 340] |5 |
|1 |[B -> 100, E -> 900, System.Date -> 310] |4 |
|1 |[B -> 110, C -> 200, System.Date -> 320] |3 |
|1 |[A -> 330, B -> 120, C -> 210, D -> 410, E -> 100]|2 |
+-----+--------------------------------------------------+---+
I need to project the column:'signals' with key-values to multiple columns as follows:
+-----+---+-----------+----+----+----+----+----+----+
|asset|ts |System.Date|F |E |B |D |C |A |
+-----+---+-----------+----+----+----+----+----+----+
|2 |6 |null |3000|null|null|1100|null|null|
|1 |5 |340 |null|null|null|500 |null|null|
|1 |4 |310 |null|900 |100 |null|null|null|
|1 |3 |320 |null|null|110 |null|200 |null|
|1 |2 |null |null|100 |120 |410 |210 |330 |
+-----+---+-----------+----+----+----+----+----+----+
So here is the example:
d = [{'asset': '2', 'ts': 6, 'signals':{'F': '3000','D':'1100'}},
{'asset': '1', 'ts': 5, 'signals':{'System.Date': '340','D':'500'}},
{'asset': '1', 'ts': 4, 'signals':{'System.Date': '310', 'B': '100', 'E':'900'}},
{'asset': '1', 'ts': 3, 'signals':{'System.Date': '320', 'B': '110','C':'200'}},
{'asset': '1', 'ts': 2, 'signals':{'A': '330', 'B': '120','C':'210','D':'410','E':'100'}}]
df = spark.createDataFrame(d)
I can extract all the possible keys and achieve my objective as follows:
from pyspark.sql import functions as spfn
# the following takes too long (WANT-TO-AVOID)
all_signals = (df
.select(spfn.explode("signals"))
.select("key")
.distinct()
.rdd.flatMap(lambda x: x)
.collect())
print(all_signals)
exprs = [spfn.col("signals").getItem(k).alias(k) for k in all_signals]
df1 = df.select(spfn.col('*'),*exprs).drop('signals')
df1.show(truncate=False)
['System.Date', 'F', 'E', 'B', 'D', 'C', 'A']
+-----+---+-----------+----+----+----+----+----+----+
|asset|ts |System.Date|F |E |B |D |C |A |
+-----+---+-----------+----+----+----+----+----+----+
|2 |6 |null |3000|null|null|1100|null|null|
|1 |5 |340 |null|null|null|500 |null|null|
|1 |4 |310 |null|900 |100 |null|null|null|
|1 |3 |320 |null|null|110 |null|200 |null|
|1 |2 |null |null|100 |120 |410 |210 |330 |
+-----+---+-----------+----+----+----+----+----+----+
but I was wondering if there is a way to use the following but don't know how to keep existing columns:
df2 = spark.read.json(df.rdd.map(lambda r: r.signals))
df2.show(truncate=False)
+----+----+----+----+----+----+-----------+
|A |B |C |D |E |F |System.Date|
+----+----+----+----+----+----+-----------+
|null|null|null|1100|null|1000|null |
|null|null|null|500 |null|null|340 |
|null|100 |null|null|900 |null|310 |
|null|110 |200 |null|null|null|320 |
|330 |120 |210 |410 |100 |null|null |
+----+----+----+----+----+----+-----------+
The step above (labeled WANT-TO-AVOID) to get all keys takes a long time while the "spark.read.json" seems much faster. So again, is there an easier and faster way to expand the map column?
You could do this with read.json
to get your desired columns. (spark2.4+)
.
from pyspark.sql import functions as F
df1=df.withColumn("signals", F.map_concat("signals", F.create_map(F.lit("asset"),"asset",F.lit("ts"),"ts")))
df2 = spark.read.json(df1.rdd.map(lambda r: r.signals))
df2.show()
#+----+----+----+----+----+----+-----------+-----+---+
#| A| B| C| D| E| F|System.Date|asset| ts|
#+----+----+----+----+----+----+-----------+-----+---+
#|null|null|null|1100|null|3000| null| 2| 6|
#|null|null|null| 500|null|null| 340| 1| 5|
#|null| 100|null|null| 900|null| 310| 1| 4|
#|null| 110| 200|null|null|null| 320| 1| 3|
#| 330| 120| 210| 410| 100|null| null| 1| 2|
#+----+----+----+----+----+----+-----------+-----+---+
Your implementation of getting the unique key values was slow, but this should be faster:
keys_df = df.select(F.explode(F.map_keys(F.col("signals")))).distinct()
keys = list(map(lambda row: row[0], keys_df.collect()))
key_cols = list(map(lambda f: F.col("signals").getItem(f).alias(str(f)), keys))
final_cols = [F.col("asset"), F.col("ts")] + key_cols
df.select(final_cols).show()
+-----+---+-----------+----+----+----+----+----+----+
|asset| ts|System.Date| F| E| B| D| C| A|
+-----+---+-----------+----+----+----+----+----+----+
| 2| 6| null|3000|null|null|1100|null|null|
| 1| 5| 340|null|null|null| 500|null|null|
| 1| 4| 310|null| 900| 100|null|null|null|
| 1| 3| 320|null|null| 110|null| 200|null|
| 1| 2| null|null| 100| 120| 410| 210| 330|
+-----+---+-----------+----+----+----+----+----+----+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With