I need to generate a full list of row_numbers for a data table with many columns.
In SQL, this would look like this:
select key_value, col1, col2, col3, row_number() over (partition by key_value order by col1, col2 desc, col3) from temp ;
Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like
(key1, (1,2,3)) (key1, (1,4,7)) (key1, (2,2,3)) (key2, (5,5,5)) (key2, (5,5,9)) (key2, (7,5,5)) etc.
I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number
(key1, (1,2,3), 2) (key1, (1,4,7), 1) (key1, (2,2,3), 3) (key2, (5,5,5), 1) (key2, (5,5,9), 2) (key2, (7,5,5), 3) etc.
(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)
How do I do this?
Here's my first attempt:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3)) val temp1 = sc.parallelize(sample_data) temp1.collect().foreach(println) // ((3,4),5,5,5) // ((3,4),5,5,9) // ((3,4),7,5,5) // ((1,2),1,2,3) // ((1,2),1,4,7) // ((1,2),2,2,3) temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println) // ((((1,2),1,2,3),1),0) // ((((1,2),1,4,7),1),1) // ((((1,2),2,2,3),1),2) // ((((3,4),5,5,5),1),3) // ((((3,4),5,5,9),1),4) // ((((3,4),7,5,5),1),5) // note that this isn't ordering with a partition on key value K! val temp2 = temp1.???
Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println) // ((1,2),1,4,7) // ((1,2),1,2,3) // ((1,2),2,2,3) // ((3,4),5,5,5) // ((3,4),5,5,9) // ((3,4),7,5,5)
Here's a little more progress, but still not partitioned:
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1)) temp2.collect().foreach(println) // ((1,2),1,4,7,1) // ((1,2),1,2,3,2) // ((1,2),2,2,3,3) // ((3,4),5,5,5,4) // ((3,4),5,5,9,5) // ((3,4),7,5,5,6)
The row_number() function returns the sequential row number starting from the 1 to the result of each window partition. The rank() function in PySpark returns the rank to the development within the window partition. So, this function leaves gaps in the class when there are ties.
ROW_NUMBER numbers all rows sequentially (for example 1, 2, 3, 4, 5). RANK provides the same numeric value for ties (for example 1, 2, 2, 4, 5). ROW_NUMBER is a temporary value calculated when the query is run. To persist numbers in a table, see IDENTITY Property and SEQUENCE.
The row_number() is a window function in Spark SQL that assigns a row number (sequential integer number) to each row in the result DataFrame. This function is used with Window. partitionBy() which partitions the data into windows frames and orderBy() clause to sort the rows in each partition.
ROW_NUMBER() Function The Row_Number function is used to provide consecutive numbering of the rows in the result by the order selected in the OVER clause for each partition specified in the OVER clause. It will assign the value 1 for the first row and increase the number of the subsequent rows.
The row_number() over (partition by ... order by ...)
functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.
Create a test DataFrame:
from pyspark.sql import Row, functions as F testDF = sc.parallelize( (Row(k="key1", v=(1,2,3)), Row(k="key1", v=(1,4,7)), Row(k="key1", v=(2,2,3)), Row(k="key2", v=(5,5,5)), Row(k="key2", v=(5,5,9)), Row(k="key2", v=(7,5,5)) ) ).toDF()
Add the partitioned row number:
from pyspark.sql.window import Window (testDF .select("k", "v", F.rowNumber() .over(Window .partitionBy("k") .orderBy("k") ) .alias("rowNum") ) .show() ) +----+-------+------+ | k| v|rowNum| +----+-------+------+ |key1|[1,2,3]| 1| |key1|[1,4,7]| 2| |key1|[2,2,3]| 3| |key2|[5,5,5]| 1| |key2|[5,5,9]| 2| |key2|[7,5,5]| 3| +----+-------+------+
This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.
Here is how I would tackle it:
1- Simplify your data:
temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
temp2 is now a "real" key-value pair. It looks like that:
[ ((3, 4), (5, 5, 5)), ((3, 4), (5, 5, 9)), ((3, 4), (7, 5, 5)), ((1, 2), (1, 2, 3)), ((1, 2), (1, 4, 7)), ((1, 2), (2, 2, 3))
]
2- Then, use the group-by function to reproduce the effect of the PARTITION BY:
temp3 = temp2.groupByKey()
temp3 is now a RDD with 2 rows:
[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>), ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):
temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:
lambda tuple : (tuple[0],-tuple[1],tuple[2])
At the end (without the key argument function, it looks like that):
[ ((1, 2), ((1, 2, 3), 0)), ((1, 2), ((1, 4, 7), 1)), ((1, 2), ((2, 2, 3), 2)), ((3, 4), ((5, 5, 5), 0)), ((3, 4), ((5, 5, 9), 1)), ((3, 4), ((7, 5, 5), 2))
]
Hope that helps!
Good luck.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With