Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the most efficient way to do a sorted reduce in PySpark?

I am analyzing on-time performance records of US domestic flights from 2015. I need to group by tail number, and store a date sorted list of all the flights for each tail number in a database, to be retrieved by my application. I am not sure which of two options for achieving this is the best one.

# Load the parquet file
on_time_dataframe = sqlContext.read.parquet('../data/on_time_performance.parquet')

# Filter down to the fields we need to identify and link to a flight
flights = on_time_dataframe.rdd.map(lambda x: 
  (x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum)
  )

I can achieve this in a reduce sort...

# Group flights by tail number, sorted by date, then flight number, then 
origin/dest
flights_per_airplane = flights\
  .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
  .reduceByKey(lambda a, b: sorted(a + b, key=lambda x: (x[1],x[2],x[3],x[4])))

Or I can achieve it in a subsequent map job...

# Do same in a map step, more efficient or does pySpark know how to optimize the above?
flights_per_airplane = flights\
  .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
  .reduceByKey(lambda a, b: a + b)\
  .map(lambda tuple: 
    (
      tuple[0], sorted(tuple[1], key=lambda x: (x[1],x[2],x[3],x[4])))
    )

Doing this in the reduce seems really inefficient, but in fact both are very slow. sorted() looks like the way to do this in the PySpark docs, so I'm wondering if PySpark doesn't make this kosher internally? Which option is the most efficient or the best choice for some other reason?

My code is also in a gist here: https://gist.github.com/rjurney/af27f70c76dc6c6ae05c465271331ade

If you're curious about the data, it is from the Bureau of Transportation Statistics, here: http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time

like image 472
rjurney Avatar asked Apr 02 '16 17:04

rjurney


People also ask

How do you use PySpark to reduce by?

PySpark reduceByKey() transformation is used to merge the values of each key using an associative reduce function on PySpark RDD. It is a wider transformation as it shuffles data across multiple partitions and It operates on pair RDD (key/value pair).

What is the difference between sort and orderBy in Spark?

sort() is more efficient compared to orderBy() because the data is sorted on each partition individually and this is why the order in the output data is not guaranteed. On the other hand, orderBy() collects all the data into a single executor and then sorts them.

How does sort work in PySpark?

In PySpark, the DataFrame class provides a sort() function which is defined to sort on one or more columns and it sorts by ascending order by default. The PySpark DataFrame also provides the orderBy() function to sort on one or more columns. and it orders by ascending by default.


1 Answers

Unfortunately both ways are wrong before you even start sorting and there is no effective and simple way of doing this in Spark. Still, the first one is significantly worse than the other.

Why both ways are wrong? Because it is just another groupByKey and it is simply an expensive operation. There are some ways you can try to improve things (especially to avoid map side reduction) but at the end of the day you just have to pay the price of a full shuffle and if you don't see any failures it is probably not worth all the fuss.

Still, the second approach is much better algorithmically*. If you want to keep sorted structure all the way through like in the first attempt you should dedicated tools (aggregateByKey with bisect.insort would be a good choice) but there is really nothing to gain here.

If the grouped output is a hard requirement the best thing you can do is to keyBy, groupByKey and sort. It won't improve performance over the second solution but arguably will improve readability:

(flights
    .keyBy(lambda x: x[5])
    .groupByKey()
    .mapValues(lambda vs: sorted(vs, key=lambda x: x[1:5])))

* Even if you assume the best case scenario for Timsort the first approach is N times O(N) while the second one is O(N log N) in the worst case scenario.

like image 74
zero323 Avatar answered Sep 30 '22 08:09

zero323