Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Data shuffle for Hive and Spark window function

Does data shuffle occur when use Hive window function on data that already on the same node?

Specifically in the following example, before use window function data are already repartitioned by 'City' with Spark repartition() function, which should ensure all data of city 'A' co-localized on the same node (assuming data for a city can fit in to one node).

df = sqlContext.createDataFrame(
    [('A', '1', 2009, "data1"),
     ('A', '1', 2015, "data2"),
     ('A', '22', 2015, "data3"),
     ('A', '22', 2016, "data4"),
     ('BB', '333', 2014, "data5"), 
     ('BB', '333', 2012, "data6"), 
     ('BB', '333', 2016, "data7")
    ],
    ("City", "Person","year", "data"))
df = df.repartition(2, 'City')
df.show()
# +----+------+----+-----+
# |City|Person|year| data|
# +----+------+----+-----+
# |  BB|   333|2012|data6|
# |  BB|   333|2014|data5|
# |  BB|   333|2016|data7|
# |   A|    22|2016|data4|
# |   A|    22|2015|data3|
# |   A|     1|2009|data1|
# |   A|     1|2015|data2|
# +----+------+----+-----+

Then I have to do a window function partition by 'Person', which is not the partition key in Spark repartition() as follows.

df.registerTempTable('example')
sqlStr = """\
    select *,
        row_number() over (partition by Person order by year desc) ranking
    from example
"""
sqlContext.sql(sqlStr).show(100)

# +----+------+----+-----+-------+
# |City|Person|year| data|ranking|
# +----+------+----+-----+-------+
# |  BB|   333|2016|data7|      1|
# |  BB|   333|2014|data5|      2|
# |  BB|   333|2012|data6|      3|
# |   A|     1|2015|data2|      1|
# |   A|     1|2009|data1|      2|
# |   A|    22|2016|data4|      1|
# |   A|    22|2015|data3|      2|
# +----+------+----+-----+-------+

Here are my questions:

  1. Is there any relation or difference between Spark "repartition" and Hive "partition by"? Under the hood, are they translated to the same thing on Spark?

  2. I want to check whether my following understanding is correct. Even all data already on the same node, if I call Spark df.repartition('A_key_different_from_current_partidion_key'), data will be shuffled to many nodes, instead of stay together on the same node.

BTW, I would also curious whether it is simple to implement the example Hive query with Spark window function.

like image 418
Bin Avatar asked Apr 19 '16 00:04

Bin


1 Answers

Both partition by clause in window functions and repartition are executed the same TungstenExchange mechanism. You see this when you analyze execution plan:

sqlContext.sql(sqlStr).explain()

## == Physical Plan ==
## Window [City#0,Person#1,year#2L,data#3], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber() windowspecdefinition(Person#1,year#2L DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS ranking#5], [Person#1], [year#2L DESC]
## +- Sort [Person#1 ASC,year#2L DESC], false, 0
##    +- TungstenExchange hashpartitioning(Person#1,200), None <- PARTITION BY
##       +- Project [City#0,Person#1,year#2L,data#3]
##          +- TungstenExchange hashpartitioning(City#0,2), None <- REPARTITION
##             +- ConvertToUnsafe
##                +- Scan ExistingRDD[City#0,Person#1,year#2L,data#3]

Regarding the second question you assumption is correct. Even if data is already located on a single node, Spark has no a priori knowledge about data distribution and will shuffle data once again.

Finally, depending on a point of view, your query is already a Spark query, or it is impossible to execute this using plain Spark.

  • it is a Spark query because DSL counterpart will use exactly the same mechanisms

    from pyspark.sql.window import Window
    from pyspark.sql.functions import col, row_number
    
    w = Window.partitionBy("person").orderBy(col("year").desc())
    df.withColumn("ranking", row_number().over(w))
    
  • it is impossible to execute this using plain Spark, because as of Spark 1.6, there is no native implementation of window functions. It changed in Spark 2.0.

like image 89
zero323 Avatar answered Sep 21 '22 12:09

zero323