Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java & Spark : add unique incremental id to dataset

With Spark and Java, I am trying to add to an existing Dataset[Row] with n columns an Integer identify column.

I successfully added an id with zipWithUniqueId() or with zipWithIndex, even using monotonically_increasing_id(). But neither one gives satisfaction.

Example : I have one dataset with 195 rows. When I use one of these three methods, i get some id like 1584156487 or 12036. Plus, those id's are not contiguous.

What i need/want is rather simply : an Integer id column, which value goes 1 to dataset.count() foreach row, where id = 1 is followed by id = 2, etc.

How can I do that in Java/Spark ?

like image 301
MrNierda Avatar asked Aug 03 '17 09:08

MrNierda


2 Answers

You can try to use the row_number function :

In java :

import org.apache.spark.sql.functions;
import org.apache.spark.sql.expressions.Window;

df.withColumn("id", functions.row_number().over(Window.orderBy("a column")));

Or in scala :

import org.apache.spark.sql.expressions.Window;
df.withColumn("id",row_number().over(Window.orderBy("a column")))
like image 98
Fabich Avatar answered Nov 15 '22 07:11

Fabich


If you wish to use streaming data frames, you can use a udf with guid generator:

val generateUuid = udf(() => java.util.UUID.randomUUID.toString())

// Cast the data as string (it comes in as binary by default)
val ddfStream = ddfStream.withColumn("uniqueId", generateUuid())
like image 21
Mor Shemesh Avatar answered Nov 15 '22 07:11

Mor Shemesh