Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using monotonically_increasing_id() for assigning row number to pyspark dataframe

I am using monotonically_increasing_id() to assign row number to pyspark dataframe using syntax below:

df1 = df1.withColumn("idx", monotonically_increasing_id()) 

Now df1 has 26,572,528 records. So I was expecting idx value from 0-26,572,527.

But when I select max(idx), its value is strangely huge: 335,008,054,165.

What's going on with this function? is it reliable to use this function for merging with another dataset having similar number of records?

I have some 300 dataframes which I want to combine into a single dataframe. So one dataframe contains IDs and others contain different records corresponding to them row-wise

like image 241
muni Avatar asked Jan 11 '18 14:01

muni


People also ask

How do you add row numbers in DataFrame PySpark?

In order to populate row number in pyspark we use row_number() Function. row_number() function along with partitionBy() of other column populates the row number by group.

How do I assign row numbers in spark?

The row_number() is a window function in Spark SQL that assigns a row number (sequential integer number) to each row in the result DataFrame. This function is used with Window. partitionBy() which partitions the data into windows frames and orderBy() clause to sort the rows in each partition.

How do you use Rownum in PySpark?

The row_number() function returns the sequential row number starting from the 1 to the result of each window partition. The rank() function in PySpark returns the rank to the development within the window partition. So, this function leaves gaps in the class when there are ties.

What is Monotonically_increasing_id?

monotonically_increasing_id ()[source] A column that generates monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.


1 Answers

Edit: Full examples of the ways to do this and the risks can be found here

From the documentation

A column that generates monotonically increasing 64-bit integers.

The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records.

Thus, it is not like an auto-increment id in RDBs and it is not reliable for merging.

If you need an auto-increment behavior like in RDBs and your data is sortable, then you can use row_number

df.createOrReplaceTempView('df') spark.sql('select row_number() over (order by "some_column") as num, * from df') +---+-----------+ |num|some_column| +---+-----------+ |  1|   ....... | |  2|   ....... | |  3| ..........| +---+-----------+ 

If your data is not sortable and you don't mind using rdds to create the indexes and then fall back to dataframes, you can use rdd.zipWithIndex()

An example can be found here

In short:

# since you have a dataframe, use the rdd interface to create indexes with zipWithIndex() df = df.rdd.zipWithIndex() # return back to dataframe df = df.toDF()  df.show()  # your data           | indexes +---------------------+---+ |         _1          | _2|  +-----------=---------+---+ |[data col1,data col2]|  0| |[data col1,data col2]|  1| |[data col1,data col2]|  2| +---------------------+---+ 

You will probably need some more transformations after that to get your dataframe to what you need it to be. Note: not a very performant solution.

Hope this helps. Good luck!

Edit: Come to think about it, you can combine the monotonically_increasing_id to use the row_number:

# create a monotonically increasing id  df = df.withColumn("idx", monotonically_increasing_id())  # then since the id is increasing but not consecutive, it means you can sort by it, so you can use the `row_number` df.createOrReplaceTempView('df') new_df = spark.sql('select row_number() over (order by "idx") as num, * from df') 

Not sure about performance though.

like image 105
mkaran Avatar answered Sep 27 '22 00:09

mkaran