I need to add an index column to a dataframe with three very simple constraints:
start from 0
be sequential
be deterministic
I'm sure I'm missing something obvious because the examples I'm finding look very convoluted for such a simple task, or use non-sequential, non deterministic increasingly monotonic id's. I don't want to zip with index and then have to separate the previously separated columns that are now in a single column because my dataframes are in the terabytes and it just seems unnecessary. I don't need to partition by anything, nor order by anything, and the examples I'm finding do this (using window functions and row_number). All I need is a simple 0 to df.count sequence of integers. What am I missing here?
1, 2, 3, 4, 5
Adding sequential unique IDs to a Spark Dataframe is not very straight-forward, especially considering the distributed nature of it. You can do this using either zipWithIndex() or row_number() (depending on the amount and kind of your data) but in every case there is a catch regarding performance.
A column that generates monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits.
The function is non-deterministic because its result depends on partition IDs.
Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.
What I mean is: how can I add a column with an ordered, monotonically increasing by 1 sequence 0:df.count? (from comments)
You can use row_number()
here, but for that you'd need to specify an orderBy()
. Since you don't have an ordering column, just use monotonically_increasing_id()
.
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window
df = df.withColumn(
"index",
row_number().over(Window.orderBy(monotonically_increasing_id()))-1
)
Also, row_number()
starts at 1, so you'd have to subtract 1 to have it start from 0. The last value will be df.count - 1
.
I don't want to zip with index and then have to separate the previously separated columns that are now in a single column
You can use zipWithIndex
if you follow it with a call to map
, to avoid having all of the separated columns turn into a single column:
cols = df.columns
df = df.rdd.zipWithIndex().map(lambda row: (row[1],) + tuple(row[0])).toDF(["index"] + cols
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With