This question is not new, however I am finding surprising behavior in Spark. I need to add a column of row IDs to a DataFrame. I used the DataFrame method monotonically_increasing_id() and It does give me an additional col of uniques row IDs (that are NOT consecutive by the way, but are unique).
The problem I'm having is that when I filter the DataFrame the row Ids in the resulting DataFrame are re-assigned. The two DataFrames are shown below.
the first one is the initial DataFrame with row IDs added as follows:
df.withColumn("rowId", monotonically_increasing_id())
the second DataFrame is the one obtained after filtering on the col P via df.filter(col("P"))
.
The problem is illustrated by the rowId for custId 169, which was 5 in the initial DataFrame, but after filtering that rowId (5) was re-assigned to custmId 773 when custId 169 was filtered out! I don't know why this is the default behavior.
I would want the rowIds
to be "sticky"; if I remove rows from the DataFrame I do not want their IDs "re-used", I want them gone too along with their rows. Is it possible to do that? I don't see any flags to request this behavior from monotonically_increasing_id
method.
+---------+--------------------+-------+ | custId | features| P |rowId| +---------+--------------------+-------+ |806 |[50,5074,...| true| 0| |832 |[45,120,1...| true| 1| |216 |[6691,272...| true| 2| |926 |[120,1788...| true| 3| |875 |[54,120,1...| true| 4| |169 |[19406,21...| false| 5| after filtering on P: +---------+--------------------+-------+ | custId| features| P |rowId| +---------+--------------------+-------+ | 806|[50,5074,...| true| 0| | 832|[45,120,1...| true| 1| | 216|[6691,272...| true| 2| | 926|[120,1788...| true| 3| | 875|[54,120,1...| true| 4| | 773|[3136,317...| true| 5|
Add New Column with Constant Value In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .
The row_number() is a window function in Spark SQL that assigns a row number (sequential integer number) to each row in the result DataFrame. This function is used with Window. partitionBy() which partitions the data into windows frames and orderBy() clause to sort the rows in each partition.
Spark 2.0
This is issue has been resolved in Spark 2.0 with SPARK-14241.
Another similar issue has been resolved in Spark 2.1 with SPARK-14393
Spark 1.x
Problem you experience is rather subtle but can be reduced to a simple fact monotonically_increasing_id
is an extremely ugly function. It is clearly not pure and its value depends on something that is completely out your control.
It doesn't take any parameters so from an optimizer perspective it doesn't matter when it is called and can be pushed after all other operations. Hence the behavior you see.
If you take look at the code you'll find out this is explicitly marked by extending MonotonicallyIncreasingID
expression with Nondeterministic
.
I don't think there is any elegant solution but one way you can handle this is to add an artificial dependency on the filtered value. For example with an UDF like this:
from pyspark.sql.types import LongType from pyspark.sql.functions import udf bound = udf(lambda _, v: v, LongType()) (df .withColumn("rn", monotonically_increasing_id()) # Due to nondeterministic behavior it has to be a separate step .withColumn("rn", bound("P", "rn")) .where("P"))
In general it could be cleaner to add indices using zipWithIndex
on a RDD
and then convert it back to a DataFrame
.
* Workaround shown above is no longer a valid solution (nor required) in Spark 2.x where Python UDFs are subject of the execution plan optimizations.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With