Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I get a SQL row_number equivalent for a Spark RDD?

I need to generate a full list of row_numbers for a data table with many columns.

In SQL, this would look like this:

select    key_value,    col1,    col2,    col3,    row_number() over (partition by key_value order by col1, col2 desc, col3) from    temp ; 

Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like

(key1, (1,2,3)) (key1, (1,4,7)) (key1, (2,2,3)) (key2, (5,5,5)) (key2, (5,5,9)) (key2, (7,5,5)) etc. 

I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number

(key1, (1,2,3), 2) (key1, (1,4,7), 1) (key1, (2,2,3), 3) (key2, (5,5,5), 1) (key2, (5,5,9), 2) (key2, (7,5,5), 3) etc. 

(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)

How do I do this?

Here's my first attempt:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))  val temp1 = sc.parallelize(sample_data)  temp1.collect().foreach(println)  // ((3,4),5,5,5) // ((3,4),5,5,9) // ((3,4),7,5,5) // ((1,2),1,2,3) // ((1,2),1,4,7) // ((1,2),2,2,3)  temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)  // ((((1,2),1,2,3),1),0) // ((((1,2),1,4,7),1),1) // ((((1,2),2,2,3),1),2) // ((((3,4),5,5,5),1),3) // ((((3,4),5,5,9),1),4) // ((((3,4),7,5,5),1),5)  // note that this isn't ordering with a partition on key value K!  val temp2 = temp1.??? 

Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)  // ((1,2),1,4,7) // ((1,2),1,2,3) // ((1,2),2,2,3) // ((3,4),5,5,5) // ((3,4),5,5,9) // ((3,4),7,5,5) 

Here's a little more progress, but still not partitioned:

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))  temp2.collect().foreach(println)  // ((1,2),1,4,7,1) // ((1,2),1,2,3,2) // ((1,2),2,2,3,3) // ((3,4),5,5,5,4) // ((3,4),5,5,9,5) // ((3,4),7,5,5,6) 
like image 726
Glenn Strycker Avatar asked Nov 20 '14 21:11

Glenn Strycker


People also ask

How do you use Rownum in PySpark?

The row_number() function returns the sequential row number starting from the 1 to the result of each window partition. The rank() function in PySpark returns the rank to the development within the window partition. So, this function leaves gaps in the class when there are ties.

How do I get row_number in SQL Server?

ROW_NUMBER numbers all rows sequentially (for example 1, 2, 3, 4, 5). RANK provides the same numeric value for ties (for example 1, 2, 2, 4, 5). ROW_NUMBER is a temporary value calculated when the query is run. To persist numbers in a table, see IDENTITY Property and SEQUENCE.

How do I create a row number in Spark?

The row_number() is a window function in Spark SQL that assigns a row number (sequential integer number) to each row in the result DataFrame. This function is used with Window. partitionBy() which partitions the data into windows frames and orderBy() clause to sort the rows in each partition.

What is row_number () over partition by in SQL?

ROW_NUMBER() Function The Row_Number function is used to provide consecutive numbering of the rows in the result by the order selected in the OVER clause for each partition specified in the OVER clause. It will assign the value 1 for the first row and increase the number of the subsequent rows.


2 Answers

The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.

Create a test DataFrame:

from pyspark.sql import Row, functions as F  testDF = sc.parallelize(     (Row(k="key1", v=(1,2,3)),      Row(k="key1", v=(1,4,7)),      Row(k="key1", v=(2,2,3)),      Row(k="key2", v=(5,5,5)),      Row(k="key2", v=(5,5,9)),      Row(k="key2", v=(7,5,5))     ) ).toDF() 

Add the partitioned row number:

from pyspark.sql.window import Window  (testDF  .select("k", "v",          F.rowNumber()          .over(Window                .partitionBy("k")                .orderBy("k")               )          .alias("rowNum")         )  .show() )  +----+-------+------+ |   k|      v|rowNum| +----+-------+------+ |key1|[1,2,3]|     1| |key1|[1,4,7]|     2| |key1|[2,2,3]|     3| |key2|[5,5,5]|     1| |key2|[5,5,9]|     2| |key2|[7,5,5]|     3| +----+-------+------+ 
like image 169
dnlbrky Avatar answered Oct 20 '22 15:10

dnlbrky


This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.

Here is how I would tackle it:

1- Simplify your data:

temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3]))) 

temp2 is now a "real" key-value pair. It looks like that:

[ ((3, 4), (5, 5, 5)),   ((3, 4), (5, 5, 9)),    ((3, 4), (7, 5, 5)),    ((1, 2), (1, 2, 3)),   ((1, 2), (1, 4, 7)),    ((1, 2), (2, 2, 3)) 

]

2- Then, use the group-by function to reproduce the effect of the PARTITION BY:

temp3 = temp2.groupByKey() 

temp3 is now a RDD with 2 rows:

[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),    ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)] 

3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):

 temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10) 

Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:

lambda tuple : (tuple[0],-tuple[1],tuple[2]) 

At the end (without the key argument function, it looks like that):

[ ((1, 2), ((1, 2, 3), 0)),  ((1, 2), ((1, 4, 7), 1)),  ((1, 2), ((2, 2, 3), 2)),  ((3, 4), ((5, 5, 5), 0)),  ((3, 4), ((5, 5, 9), 1)),  ((3, 4), ((7, 5, 5), 2)) 

]

Hope that helps!

Good luck.

like image 45
Guillaume Avatar answered Oct 20 '22 14:10

Guillaume