Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does the SparkSQL Dataframe function explode preserve order?

I have a Scala spark DataFrame:

df.select($"row_id", $"array_of_data").show
+----------+--------------------+
| row_id   |      array_of_data |
+----------+--------------------+
|       0  | [123, ABC, G12]    |
|       1  | [100, 410]         |
|       2  | [500, 300,  ...]   |

I want to explode those arrays so that each element is in a different row, but I also want to mark which row corresponds to the first element of the array:

+----------+--------------------+----------+----------+
| row_id   |      array_of_data | exploded | is_first |
+----------+--------------------+----------+----------+
|       0  | [123, ABC, G12]    | 123      |    Yes   |
|       0  | [123, ABC, G12]    | ABC      |    No    |
|       0  | [123, ABC, G12]    | G12      |    No    |

To achieve this, I use the explode function, and hope that the first row corresponds to the first data element:

var exploded_df = df.withColumn("exploded", explode($"array_of_data"))

val window = Window.partitionBy("row_id").orderBy("row_id")
// Create an internal rank variable to figure out the first element
exploded_df = exploded_df.withColumn("_rank", row_number().over(window))
exploded_df = exploded_df.withColumn("is_first",
                                     when(($"_rank" === 1), "Yes").otherwise("No")
                                    )

This appears to work for my purposes and produces the desired output, but can I trust that this will always work? I can't find anywhere in the explode documentation that promises this behavior, and it seems unwise to trust the order of rows in a Spark dataframe.

The only other solution I could think of would be to create a new column for every element in array_of_data and then match when exploded is equal to the value from the first column, but I don't have a guarantee that there wont be duplicate values in the array.

like image 500
Kyle Heuton Avatar asked Mar 07 '18 16:03

Kyle Heuton


1 Answers

You can use posexplode function for that purpose.

As the api document explains it

Creates a new row for each element with position in the given array or map column.

You can use select function so that the position and exploded column form separate columns as

import org.apache.spark.sql.functions._
df.select($"row_id", posexplode($"array_of_data")).show(false)

which should give you

+------+---------------+---+---+
|row_id|array_of_data  |pos|col|
+------+---------------+---+---+
|0     |[123, ABC, G12]|0  |123|
|0     |[123, ABC, G12]|1  |ABC|
|0     |[123, ABC, G12]|2  |G12|
+------+---------------+---+---+
like image 85
Ramesh Maharjan Avatar answered Oct 27 '22 10:10

Ramesh Maharjan