Does data shuffle occur when use Hive window function on data that already on the same node?
Specifically in the following example, before use window function data are already repartitioned by 'City' with Spark repartition() function, which should ensure all data of city 'A' co-localized on the same node (assuming data for a city can fit in to one node).
df = sqlContext.createDataFrame(
[('A', '1', 2009, "data1"),
('A', '1', 2015, "data2"),
('A', '22', 2015, "data3"),
('A', '22', 2016, "data4"),
('BB', '333', 2014, "data5"),
('BB', '333', 2012, "data6"),
('BB', '333', 2016, "data7")
],
("City", "Person","year", "data"))
df = df.repartition(2, 'City')
df.show()
# +----+------+----+-----+
# |City|Person|year| data|
# +----+------+----+-----+
# | BB| 333|2012|data6|
# | BB| 333|2014|data5|
# | BB| 333|2016|data7|
# | A| 22|2016|data4|
# | A| 22|2015|data3|
# | A| 1|2009|data1|
# | A| 1|2015|data2|
# +----+------+----+-----+
Then I have to do a window function partition by 'Person', which is not the partition key in Spark repartition() as follows.
df.registerTempTable('example')
sqlStr = """\
select *,
row_number() over (partition by Person order by year desc) ranking
from example
"""
sqlContext.sql(sqlStr).show(100)
# +----+------+----+-----+-------+
# |City|Person|year| data|ranking|
# +----+------+----+-----+-------+
# | BB| 333|2016|data7| 1|
# | BB| 333|2014|data5| 2|
# | BB| 333|2012|data6| 3|
# | A| 1|2015|data2| 1|
# | A| 1|2009|data1| 2|
# | A| 22|2016|data4| 1|
# | A| 22|2015|data3| 2|
# +----+------+----+-----+-------+
Here are my questions:
Is there any relation or difference between Spark "repartition" and Hive "partition by"? Under the hood, are they translated to the same thing on Spark?
I want to check whether my following understanding is correct. Even all data already on the same node, if I call Spark df.repartition('A_key_different_from_current_partidion_key'), data will be shuffled to many nodes, instead of stay together on the same node.
BTW, I would also curious whether it is simple to implement the example Hive query with Spark window function.
Both partition by
clause in window functions and repartition
are executed the same TungstenExchange
mechanism. You see this when you analyze execution plan:
sqlContext.sql(sqlStr).explain()
## == Physical Plan ==
## Window [City#0,Person#1,year#2L,data#3], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber() windowspecdefinition(Person#1,year#2L DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS ranking#5], [Person#1], [year#2L DESC]
## +- Sort [Person#1 ASC,year#2L DESC], false, 0
## +- TungstenExchange hashpartitioning(Person#1,200), None <- PARTITION BY
## +- Project [City#0,Person#1,year#2L,data#3]
## +- TungstenExchange hashpartitioning(City#0,2), None <- REPARTITION
## +- ConvertToUnsafe
## +- Scan ExistingRDD[City#0,Person#1,year#2L,data#3]
Regarding the second question you assumption is correct. Even if data is already located on a single node, Spark has no a priori knowledge about data distribution and will shuffle data once again.
Finally, depending on a point of view, your query is already a Spark query, or it is impossible to execute this using plain Spark.
it is a Spark query because DSL counterpart will use exactly the same mechanisms
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
w = Window.partitionBy("person").orderBy(col("year").desc())
df.withColumn("ranking", row_number().over(w))
it is impossible to execute this using plain Spark, because as of Spark 1.6, there is no native implementation of window functions. It changed in Spark 2.0.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With