Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark-Monotonically increasing id not working as expected in dataframe?

I have a dataframe df in Spark which looks something like this:

scala> df.show()
+--------+--------+
|columna1|columna2|
+--------+--------+
|     0.1|     0.4|
|     0.2|     0.5|
|     0.1|     0.3|
|     0.3|     0.6|
|     0.2|     0.7|
|     0.2|     0.8|
|     0.1|     0.7|
|     0.5|     0.5|
|     0.6|    0.98|
|     1.2|     1.1|
|     1.2|     1.2|
|     0.4|     0.7|
+--------+--------+

I tried to include an id column with the following code

val df_id = df.withColumn("id",monotonicallyIncreasingId)

but the id column is not what I expect:

scala> df_id.show()
+--------+--------+----------+
|columna1|columna2|        id|
+--------+--------+----------+
|     0.1|     0.4|         0|
|     0.2|     0.5|         1|
|     0.1|     0.3|         2|
|     0.3|     0.6|         3|
|     0.2|     0.7|         4|
|     0.2|     0.8|         5|
|     0.1|     0.7|8589934592|
|     0.5|     0.5|8589934593|
|     0.6|    0.98|8589934594|
|     1.2|     1.1|8589934595|
|     1.2|     1.2|8589934596|
|     0.4|     0.7|8589934597|
+--------+--------+----------+

As you can see, it goes well from 0 to 5 but then the next id is 8589934592 instead of 6 and so on.

So what is wrong here? Why is the id column not properly indexed here?

like image 625
antonioACR1 Avatar asked Dec 19 '17 20:12

antonioACR1


People also ask

What is monotonically increasing ID?

The function generates the integers monotonically increasing and consecutive in a partition. These ids are also distinct within a Dataset. The main purpose of this feature was to provide a method to uniquely identify the rows. However, exposing such id as a primary key in the data sinks may be problematic.

What is AGG function in spark?

Aggregations are generally used to get the summary of the data. You can count, add and also find the product of the data. Using Spark, you can aggregate any kind of value into a set, list, etc.

How do I increase the number of partitions in spark?

How to increase the number of partitions. If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.

How does spark determine dataset size?

Similar to Python Pandas you can get the Size and Shape of the PySpark (Spark with Python) DataFrame by running count() action to get the number of rows on DataFrame and len(df. columns()) to get the number of columns.


1 Answers

It works as expected. This function is not intended for generating consecutive values. Instead it encodes partition number and index by partition

The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records.

As an example, consider a DataFrame with two partitions, each with 3 records. This expression would return the following IDs:

0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.

If you want consecutive numbers, use RDD.zipWithIndex.

like image 80
Alper t. Turker Avatar answered Oct 16 '22 10:10

Alper t. Turker