Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark add sequential and deterministic index to dataframe

I need to add an index column to a dataframe with three very simple constraints:

  • start from 0

  • be sequential

  • be deterministic

I'm sure I'm missing something obvious because the examples I'm finding look very convoluted for such a simple task, or use non-sequential, non deterministic increasingly monotonic id's. I don't want to zip with index and then have to separate the previously separated columns that are now in a single column because my dataframes are in the terabytes and it just seems unnecessary. I don't need to partition by anything, nor order by anything, and the examples I'm finding do this (using window functions and row_number). All I need is a simple 0 to df.count sequence of integers. What am I missing here?

1, 2, 3, 4, 5

like image 948
xv70 Avatar asked Sep 13 '18 16:09

xv70


People also ask

How do I add a sequence number in spark data frame?

Adding sequential unique IDs to a Spark Dataframe is not very straight-forward, especially considering the distributed nature of it. You can do this using either zipWithIndex() or row_number() (depending on the amount and kind of your data) but in every case there is a catch regarding performance.

How do you add monotonically increasing ID Pyspark?

A column that generates monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits.

Is monotonically_increasing_id deterministic?

The function is non-deterministic because its result depends on partition IDs.

What does .collect do in Pyspark?

Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.


1 Answers

What I mean is: how can I add a column with an ordered, monotonically increasing by 1 sequence 0:df.count? (from comments)

You can use row_number() here, but for that you'd need to specify an orderBy(). Since you don't have an ordering column, just use monotonically_increasing_id().

from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

df = df.withColumn(
    "index",
    row_number().over(Window.orderBy(monotonically_increasing_id()))-1
)

Also, row_number() starts at 1, so you'd have to subtract 1 to have it start from 0. The last value will be df.count - 1.


I don't want to zip with index and then have to separate the previously separated columns that are now in a single column

You can use zipWithIndex if you follow it with a call to map, to avoid having all of the separated columns turn into a single column:

cols = df.columns
df = df.rdd.zipWithIndex().map(lambda row: (row[1],) + tuple(row[0])).toDF(["index"] + cols
like image 133
pault Avatar answered Sep 20 '22 12:09

pault