I have a dataframe where I have to generate a unique Id in one of the columns. This id has to be generated with an offset. Because , I need to persist this dataframe with the autogenerated id , now if new data comes in the autogenerated id should not collide with the existing ones. I checked the monotonically increasing function but it does not accept any offset . This is what I tried :
df=df.coalesce(1);
df = df.withColumn(inputCol,functions.monotonically_increasing_id());
But is there a way to make the monotonically_increasing_id() start from a starting offset ?
Or if you don't want to restrict your program into one only partition with df.coalesce(1)
you can use zipWithIndex
which starts with index = 0 as next:
lines = [["a1", "a2", "a3"],
["b1", "b2", "b3"],
["c1", "c2", "c3"]]
cols = ["c1", "c2", "c3"]
df = spark.createDataFrame(lines, cols)
start_indx = 10
df = df.rdd.zipWithIndex() \
.map(lambda (r, indx): (indx + start_indx, r[0], r[1], r[2])) \
.toDF(["id", "c1", "c2", "c3"])
df.show(10, False)
In this case I set the start_index = 10
. And this will be the output:
+---+---+---+---+
|id |c1 |c2 |c3 |
+---+---+---+---+
|10 |a1 |a2 |a3 |
|11 |b1 |b2 |b3 |
|12 |c1 |c2 |c3 |
+---+---+---+---+
You can simply add to it to provide a minimum value for the id. Note that it is not guaranteed the values will start from the minimum value
.withColumn("id", monotonically_increasing_id + 123)
Explanation: Operator +
is overloaded for columns https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L642
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With