Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataframe Random UUID changes after every transformation/action

I have a Spark dataframe with a column that includes a generated UUID. However, each time I do an action or transformation on the dataframe, it changes the UUID at each stage.

How do I generate the UUID only once and have the UUID remain static thereafter.

Some sample code to re-produce my issue is below:

def process(spark: SparkSession): Unit = {

  import spark.implicits._

  val sc = spark.sparkContext
  val sqlContext = spark.sqlContext
  sc.setLogLevel("OFF")

  // create dataframe
  val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")
  df.createOrReplaceTempView("df")
  df.show(false)

  // register an UDF that creates a random UUID
  val generateUUID = udf(() => UUID.randomUUID().toString)

  // generate UUID for new column
  val dfWithUuid = df.withColumn("new_uuid", generateUUID())
  dfWithUuid.show(false)
  dfWithUuid.show(false)    // uuid is different

  // new transformations also change the uuid
  val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
  dfWithUuidWithNewCol.show(false)
}

The output is:

+----+----+
|col1|col2|
+----+----+
|a   |1   |
|b   |2   |
|c   |3   |
+----+----+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |a414e73b-24b8-4f64-8d21-f0bc56d3d290|
|b   |2   |f37935e5-0bfc-4863-b6dc-897662307e0a|
|c   |3   |e3aaf655-5a48-45fb-8ab5-22f78cdeaf26|
+----+----+------------------------------------+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |1c6597bf-f257-4e5f-be81-34a0efa0f6be|
|b   |2   |6efe4453-29a8-4b7f-9fa1-7982d2670bd6|
|c   |3   |2f7ddc1c-3e8c-4118-8e2c-8a6f526bee7e|
+----+----+------------------------------------+

+----+----+------------------------------------+----+
|col1|col2|new_uuid                            |col3|
+----+----+------------------------------------+----+
|a   |1   |00b85af8-711e-4b59-82e1-8d8e59d4c512|2.0 |
|b   |2   |94c3f2c6-9234-4fb3-b1c4-273a37171131|3.0 |
|c   |3   |1059fff2-b8f9-4cec-907d-ea181d5003a2|4.0 |
+----+----+------------------------------------+----+

Note that the UUID is different at each step.

like image 724
Steve Avatar asked Mar 22 '17 19:03

Steve


2 Answers

it is very old question but letting the people know what worked for me. It might help someone.

You could use the expr function as below to generate unique GUIDs which does not change on transformations.

import org.apache.spark.sql.functions._  
// create dataframe  
val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")   
df.createOrReplaceTempView("df")   
df.show(false)

// generate UUID for new column   
val dfWithUuid = df.withColumn("new_uuid", expr("uuid()"))
dfWithUuid.show(false)
dfWithUuid.show(false)    

// new transformations 
val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
dfWithUuidWithNewCol.show(false)

Output is as below :

+----+----+
|col1|col2|
+----+----+
|a   |1   |
|b   |2   |
|c   |3   |
+----+----+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+

+----+----+------------------------------------+----+
|col1|col2|new_uuid                            |col3|
+----+----+------------------------------------+----+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|2.0 |
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|3.0 |
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|4.0 |
+----+----+------------------------------------+----+
like image 80
Nikunj Kakadiya Avatar answered Sep 17 '22 07:09

Nikunj Kakadiya


It is an expected behavior. User defined functions have to be deterministic:

The user-defined functions must be deterministic. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query.

If you want to include non-deterministic function and preserve the output you should write intermediate data to a persistent storage and read it back. Checkpointing or caching may work in some simple cases but it won't be reliable in general.

If upstream process is deterministic (for starters there is shuffle) you could try to use rand function with seed, convert to byte array and pass to UUID.nameUUIDFromBytes.

See also: About how to add a new column to an existing DataFrame with random values in Scala

Note: SPARK-20586 introduced deterministic flag, which can disable certain optimization, but it is not clear how it behaves when data is persisted and a loss of executor occurs.

like image 43
zero323 Avatar answered Sep 18 '22 07:09

zero323