Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I add an persistent column of row ids to Spark DataFrame?

This question is not new, however I am finding surprising behavior in Spark. I need to add a column of row IDs to a DataFrame. I used the DataFrame method monotonically_increasing_id() and It does give me an additional col of uniques row IDs (that are NOT consecutive by the way, but are unique).

The problem I'm having is that when I filter the DataFrame the row Ids in the resulting DataFrame are re-assigned. The two DataFrames are shown below.

  • the first one is the initial DataFrame with row IDs added as follows:

    df.withColumn("rowId", monotonically_increasing_id())  
  • the second DataFrame is the one obtained after filtering on the col P via df.filter(col("P")).

The problem is illustrated by the rowId for custId 169, which was 5 in the initial DataFrame, but after filtering that rowId (5) was re-assigned to custmId 773 when custId 169 was filtered out! I don't know why this is the default behavior.

I would want the rowIds to be "sticky"; if I remove rows from the DataFrame I do not want their IDs "re-used", I want them gone too along with their rows. Is it possible to do that? I don't see any flags to request this behavior from monotonically_increasing_id method.

+---------+--------------------+-------+ | custId  |    features|    P  |rowId| +---------+--------------------+-------+ |806      |[50,5074,...|   true|    0| |832      |[45,120,1...|   true|    1| |216      |[6691,272...|   true|    2| |926      |[120,1788...|   true|    3| |875      |[54,120,1...|   true|    4| |169      |[19406,21...|  false|    5|  after filtering on P: +---------+--------------------+-------+ |   custId|    features|    P  |rowId| +---------+--------------------+-------+ |      806|[50,5074,...|   true|    0| |      832|[45,120,1...|   true|    1| |      216|[6691,272...|   true|    2| |      926|[120,1788...|   true|    3| |      875|[54,120,1...|   true|    4| |      773|[3136,317...|   true|    5| 
like image 905
Kai Avatar asked Feb 29 '16 16:02

Kai


People also ask

How do I add a constant column in Spark DataFrame?

Add New Column with Constant Value In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .

How do I add row numbers in Spark?

The row_number() is a window function in Spark SQL that assigns a row number (sequential integer number) to each row in the result DataFrame. This function is used with Window. partitionBy() which partitions the data into windows frames and orderBy() clause to sort the rows in each partition.


1 Answers

Spark 2.0

  • This is issue has been resolved in Spark 2.0 with SPARK-14241.

  • Another similar issue has been resolved in Spark 2.1 with SPARK-14393

Spark 1.x

Problem you experience is rather subtle but can be reduced to a simple fact monotonically_increasing_id is an extremely ugly function. It is clearly not pure and its value depends on something that is completely out your control.

It doesn't take any parameters so from an optimizer perspective it doesn't matter when it is called and can be pushed after all other operations. Hence the behavior you see.

If you take look at the code you'll find out this is explicitly marked by extending MonotonicallyIncreasingID expression with Nondeterministic.

I don't think there is any elegant solution but one way you can handle this is to add an artificial dependency on the filtered value. For example with an UDF like this:

from pyspark.sql.types import LongType from pyspark.sql.functions import udf  bound = udf(lambda _, v: v, LongType())   (df   .withColumn("rn", monotonically_increasing_id())   # Due to nondeterministic behavior it has to be a separate step   .withColumn("rn", bound("P", "rn"))     .where("P")) 

In general it could be cleaner to add indices using zipWithIndex on a RDD and then convert it back to a DataFrame.


* Workaround shown above is no longer a valid solution (nor required) in Spark 2.x where Python UDFs are subject of the execution plan optimizations.

like image 153
zero323 Avatar answered Sep 18 '22 14:09

zero323