I am analyzing on-time performance records of US domestic flights from 2015. I need to group by tail number, and store a date sorted list of all the flights for each tail number in a database, to be retrieved by my application. I am not sure which of two options for achieving this is the best one.
# Load the parquet file
on_time_dataframe = sqlContext.read.parquet('../data/on_time_performance.parquet')
# Filter down to the fields we need to identify and link to a flight
flights = on_time_dataframe.rdd.map(lambda x:
(x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum)
)
I can achieve this in a reduce sort...
# Group flights by tail number, sorted by date, then flight number, then
origin/dest
flights_per_airplane = flights\
.map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
.reduceByKey(lambda a, b: sorted(a + b, key=lambda x: (x[1],x[2],x[3],x[4])))
Or I can achieve it in a subsequent map job...
# Do same in a map step, more efficient or does pySpark know how to optimize the above?
flights_per_airplane = flights\
.map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
.reduceByKey(lambda a, b: a + b)\
.map(lambda tuple:
(
tuple[0], sorted(tuple[1], key=lambda x: (x[1],x[2],x[3],x[4])))
)
Doing this in the reduce seems really inefficient, but in fact both are very slow. sorted() looks like the way to do this in the PySpark docs, so I'm wondering if PySpark doesn't make this kosher internally? Which option is the most efficient or the best choice for some other reason?
My code is also in a gist here: https://gist.github.com/rjurney/af27f70c76dc6c6ae05c465271331ade
If you're curious about the data, it is from the Bureau of Transportation Statistics, here: http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time
PySpark reduceByKey() transformation is used to merge the values of each key using an associative reduce function on PySpark RDD. It is a wider transformation as it shuffles data across multiple partitions and It operates on pair RDD (key/value pair).
sort() is more efficient compared to orderBy() because the data is sorted on each partition individually and this is why the order in the output data is not guaranteed. On the other hand, orderBy() collects all the data into a single executor and then sorts them.
In PySpark, the DataFrame class provides a sort() function which is defined to sort on one or more columns and it sorts by ascending order by default. The PySpark DataFrame also provides the orderBy() function to sort on one or more columns. and it orders by ascending by default.
Unfortunately both ways are wrong before you even start sorting and there is no effective and simple way of doing this in Spark. Still, the first one is significantly worse than the other.
Why both ways are wrong? Because it is just another groupByKey
and it is simply an expensive operation. There are some ways you can try to improve things (especially to avoid map side reduction) but at the end of the day you just have to pay the price of a full shuffle and if you don't see any failures it is probably not worth all the fuss.
Still, the second approach is much better algorithmically*. If you want to keep sorted structure all the way through like in the first attempt you should dedicated tools (aggregateByKey
with bisect.insort
would be a good choice) but there is really nothing to gain here.
If the grouped output is a hard requirement the best thing you can do is to keyBy
, groupByKey
and sort. It won't improve performance over the second solution but arguably will improve readability:
(flights
.keyBy(lambda x: x[5])
.groupByKey()
.mapValues(lambda vs: sorted(vs, key=lambda x: x[1:5])))
* Even if you assume the best case scenario for Timsort the first approach is N times O(N) while the second one is O(N log N) in the worst case scenario.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With