Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark timeout trying to repartition/write to parquet (Futures timed out after [300 seconds])?

I am running PySpark (on AWS Glue, if that matters). I am getting timeout errors: (it appears it fail to write to parquet)

The full logs at https://pastebin.com/TmuAcFx7

File "script_2019-02-06-02-32-43.py", line 197, in <module>
.parquet("s3://xxx-glue/cleanedFlights")
File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 691, in parquet
File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o246.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(countryName#24, querydatetime#213, 200)
+- *Project [master_key#588, master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 33 more fields]
+- BatchEvalPython [getInterval(cast(date_format(outdeparture#416, H, Some(Zulu)) as int), 0, 24, 4), getInterval(cast(date_format(indeparture#498, H, Some(Zulu)) as int), 0, 24, 4)], [master_key#588, master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 21 more fields]
+- *Sort [key#250 ASC NULLS FIRST, querydatetime#101 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(key#250 ASC NULLS FIRST, querydatetime#101 ASC NULLS FIRST, 200)
+- *Project [key#250 AS master_key#588, querydatetime#101 AS master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 19 more fields]
+- *BroadcastHashJoin [key#250, querydatetime#101], [key#590, querydatetime#213], LeftOuter, BuildRight
:- *Project [key#250, querydatetime#101]
: +- BroadcastNestedLoopJoin BuildRight, Cross
: :- Generate explode(pythonUDF0#1633), false, false, [querydatetime#101]
: : +- BatchEvalPython [generate_date_series(start#94, stop#95)], [start#94, stop#95, pythonUDF0#1633]
: : +- Scan ExistingRDD[start#94,stop#95]
: +- BroadcastExchange IdentityBroadcastMode
: +- *HashAggregate(keys=[key#250], functions=[], output=[key#250])
: +- *HashAggregate(keys=[key#250], functions=[], output=[key#250])
: +- *Sample 0.0, 0.001, false, 7736333241016522154
: +- *GlobalLimit 5000000
: +- Exchange SinglePartition
: +- *LocalLimit 5000000
: +- *Project [concat(outboundlegid#190, -, inboundlegid#191, -, agent#187) AS key#250]
: +- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner
: :- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(querydestinationplace#212, 200)
: : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212]
: : +- *SortMergeJoin [agent#187], [id#89], Inner
: : :- *Sort [agent#187 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(agent#187, 200)
: : : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212]
: : : +- *Filter (isnotnull(agent#187) && isnotnull(querydestinationplace#212))
: : : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields]
: : +- *Sort [id#89 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(id#89, 200)
: : +- *Project [cast(id#67L as string) AS id#89]
: : +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string)))
: : +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75]
: +- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(airportId#38 as int), 200)
: +- *Project [cast(airportId#18L as string) AS airportId#38]
: +- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string)))
: +- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[34, string, true], input[33, date, true]))
+- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, cast(QueryOutboundDate#183 as date) AS queryoutbounddate#334, cast(QueryInboundDate#184 as date) AS queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, cast(unix_timestamp(OutDeparture#192, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outdeparture#416, cast(unix_timestamp(OutArrival#193, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, cast(unix_timestamp(InDeparture#202, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS indeparture#498, cast(unix_timestamp(InArrival#203, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS inarrival#539, ... 15 more fields]
+- *Sample 0.0, 0.001, false, 7736333241016522154
+- *GlobalLimit 5000000
+- Exchange SinglePartition
+- *LocalLimit 5000000
+- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 15 more fields]
+- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner
:- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(querydestinationplace#212, 200)
: +- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 11 more fields]
: +- *SortMergeJoin [Agent#187], [id#89], Inner
: :- *Sort [Agent#187 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(Agent#187, 200)
: : +- *Filter (isnotnull(Agent#187) && isnotnull(querydestinationplace#212))
: : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields]
: +- *Sort [id#89 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#89, 200)
: +- *Project [cast(id#67L as string) AS id#89, name#68]
: +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string)))
: +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75]
+- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cast(airportId#38 as int), 200)
+- *Project [cast(airportId#18L as string) AS airportId#38, countryName#24, cityName#23, airportName#22]
+- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string)))
+- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24]

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
... 45 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange rangepartitioning(key#250 ASC NULLS FIRST, querydatetime#101 ASC NULLS FIRST, 200)
+- *Project [key#250 AS master_key#588, querydatetime#101 AS master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 19 more fields]
+- *BroadcastHashJoin [key#250, querydatetime#101], [key#590, querydatetime#213], LeftOuter, BuildRight
:- *Project [key#250, querydatetime#101]
: +- BroadcastNestedLoopJoin BuildRight, Cross
: :- Generate explode(pythonUDF0#1633), false, false, [querydatetime#101]
: : +- BatchEvalPython [generate_date_series(start#94, stop#95)], [start#94, stop#95, pythonUDF0#1633]
: : +- Scan ExistingRDD[start#94,stop#95]
: +- BroadcastExchange IdentityBroadcastMode
: +- *HashAggregate(keys=[key#250], functions=[], output=[key#250])
: +- *HashAggregate(keys=[key#250], functions=[], output=[key#250])
: +- *Sample 0.0, 0.001, false, 7736333241016522154
: +- *GlobalLimit 5000000
: +- Exchange SinglePartition
: +- *LocalLimit 5000000
: +- *Project [concat(outboundlegid#190, -, inboundlegid#191, -, agent#187) AS key#250]
: +- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner
: :- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(querydestinationplace#212, 200)
: : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212]
: : +- *SortMergeJoin [agent#187], [id#89], Inner
: : :- *Sort [agent#187 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(agent#187, 200)
: : : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212]
: : : +- *Filter (isnotnull(agent#187) && isnotnull(querydestinationplace#212))
: : : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields]
: : +- *Sort [id#89 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(id#89, 200)
: : +- *Project [cast(id#67L as string) AS id#89]
: : +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string)))
: : +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75]
: +- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(airportId#38 as int), 200)
: +- *Project [cast(airportId#18L as string) AS airportId#38]
: +- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string)))
: +- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[34, string, true], input[33, date, true]))
+- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, cast(QueryOutboundDate#183 as date) AS queryoutbounddate#334, cast(QueryInboundDate#184 as date) AS queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, cast(unix_timestamp(OutDeparture#192, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outdeparture#416, cast(unix_timestamp(OutArrival#193, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, cast(unix_timestamp(InDeparture#202, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS indeparture#498, cast(unix_timestamp(InArrival#203, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS inarrival#539, ... 15 more fields]
+- *Sample 0.0, 0.001, false, 7736333241016522154
+- *GlobalLimit 5000000
+- Exchange SinglePartition
+- *LocalLimit 5000000
+- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 15 more fields]
+- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner
:- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(querydestinationplace#212, 200)
: +- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 11 more fields]
: +- *SortMergeJoin [Agent#187], [id#89], Inner
: :- *Sort [Agent#187 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(Agent#187, 200)
: : +- *Filter (isnotnull(Agent#187) && isnotnull(querydestinationplace#212))
: : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields]
: +- *Sort [id#89 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#89, 200)
: +- *Project [cast(id#67L as string) AS id#89, name#68]
: +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string)))
: +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75]
+- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cast(airportId#38 as int), 200)
+- *Project [cast(airportId#18L as string) AS airportId#38, countryName#24, cityName#23, airportName#22]
+- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string)))
+- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24]

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
... 60 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
at 

Some googling suggests it failed because of timeout during broadcast?

My code looks like below. I think it failed near the last part, writing to parquet? But the explain logs suggests its executing the query then also?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import UserDefinedFunction, udf, regexp_replace, to_timestamp, date_format, lit
from datetime import datetime, timedelta
from pyspark.sql.types import ArrayType, StringType, DateType, Row
import math

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# READ IN FLIGHTS, AIRPORTS, AGENTS TABLES
# NOTE: Bookmarks enabled for flights data catalog
airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "airports")
airportsDF = airportsGDF.toDF().select("airportId", "countryName", "cityName", "airportName")
airportsDF = airportsDF.withColumn("airportId", airportsDF["airportId"].cast("string"))
airportsDF.createOrReplaceTempView("airports")

agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "agents")
agentsRawDF = agentsGDF.toDF().select("id", "name")
agentsRawDF = agentsRawDF.withColumn("id", agentsRawDF["id"].cast("string"))
agentsRawDF.createOrReplaceTempView("agents")

def batch(iterable, n=1):
    l = len(iterable)
    for ndx in range(0, l, n):
        yield iterable[ndx:min(ndx + n, l)]

arr = [13301,12929,14511,9968,15280,10193,13531,13439,16122,9498,16162,17210,12728,14534,12542,13303,16716,13311,12913,11036,17471,16240,10902,15526,17294,15671,10858,17482,12071,12337,17521,12274,10032,17396,11052,9970,12917,12195,10658,17409,13078,17416,17388,12118,10438,13113,11170,14213,9762,10871,11780,12392,15518,13536,10724,14260,16747,18490,17402,10284,10982,10431,16743,12482,10497,15168,16587,15412,17106,11017,17368,13804,15461,19461,16923,9794,12795,25396,12952,15422,10101,14147,10485,12210,25336,9449,15395,13947,11893,11109,9921,9799,15253,16945,13164,10031,17002,17152,16516,13180,16451,16437,11336,13428,10182,25405,16955,10180,12191]

def generate_date_series(start, stop):
    return [start + timedelta(days=x) for x in range(0, (stop-start).days + 1)]    

spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()))

def getInterval(num, start, stop, incr): 
    if (num is None):
        return ""

    lower = math.floor(num / incr) * incr
    upper = lower + incr
    return "(%d,%d]" % (lower, upper) 

spark.udf.register("getInterval", getInterval, StringType())
getIntervalUdf = udf(getInterval)

# CREATE DF FOR PAST 90 DAYS EXCLUDING PAST 7 DAYS
today = datetime.utcnow().date()
start = today - timedelta(days = 14) # TODO: CHANGE TO 90
sevenDaysAgo = today - timedelta(days = 7)
print(">>> Generate data frame for ", start, " to ", sevenDaysAgo, "... ")
relaventDatesDf = spark.createDataFrame([
    Row(start=start, stop=sevenDaysAgo)
])
relaventDatesDf.createOrReplaceTempView("relaventDates")

relaventDatesDf = spark.sql("SELECT explode(generate_date_series(start, stop)) AS querydatetime FROM relaventDates")
relaventDatesDf.createOrReplaceTempView("relaventDates")
print("===LOG:Dates===")
relaventDatesDf.show()

flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
    querydatetime BETWEEN '%s' AND '%s' 
    AND querydestinationplace IN (%s)
""" % (start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d"), ",".join(map(lambda s: str(s), arr))))

flightsDf = flightsGDF.toDF()
flightsDf.createOrReplaceTempView("flights")
print("===LOG:STARTING_QUERY===")

resultDf = spark.sql("""
    SELECT 
        f.*, 
        CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key,
        countryName, cityName, airportName, a.name AS agentName
    FROM flights f
    INNER JOIN agents a
    ON f.agent = a.id
    INNER JOIN airports p
    ON f.querydestinationplace = p.airportId
    LIMIT 5000000
""") \
    .sample(False, 0.001)
resultDf.explain(True)

print("===LOG:ADDING_COLUMNS===")
resultDf = resultDf \
    .withColumn("querydatetime", resultDf["querydatetime"].cast("date")) \
    .withColumn("queryoutbounddate", resultDf["queryoutbounddate"].cast("date")) \
    .withColumn("queryinbounddate", resultDf["queryinbounddate"].cast("date")) \
    .withColumn("outdeparture", to_timestamp(resultDf["outdeparture"], "yyyy-MM-dd'T'HH:mm:ss")) \
    .withColumn("outarrival", to_timestamp(resultDf["outarrival"], "yyyy-MM-dd'T'HH:mm:ss")) \
    .withColumn("indeparture", to_timestamp(resultDf["indeparture"], "yyyy-MM-dd'T'HH:mm:ss")) \
    .withColumn("inarrival", to_timestamp(resultDf["inarrival"], "yyyy-MM-dd'T'HH:mm:ss"))

print("===LOG:WRITING_RAW===")
print("===LOG:DONE_WRITING_RAW===")
resultDf.createOrReplaceTempView("flights")

# GET DISTINCT DATASET
# distinctKeysDf = resultDf.select("outboundlegid", "inboundlegid", "agent").groupBy(["outboundlegid", "inboundlegid", "agent"])
distinctKeysDf = spark.sql("""
    SELECT key
    FROM flights
    GROUP BY key
""")
distinctKeysDf.createOrReplaceTempView("distinctKeys")

# GET RELAVENT DATES DATASET
print("===LOG:WRITING_EXPANDED===")
expandedKeyDatesDf = spark.sql("""
    SELECT key, querydatetime
    FROM relaventDates
    CROSS JOIN distinctKeys
""")

expandedKeyDatesDf.createOrReplaceTempView("expandedKeyDates")
print("===LOG:DONE_WRITING_EXPANDED===")

cleanedFlightsDf = spark.sql("""
    SELECT 
        e.key AS master_key, 
        e.querydatetime AS master_querydatetime, 
        f.*
    FROM expandedKeyDates e
    LEFT JOIN flights f
    ON e.key = f.key
    AND e.querydatetime = f.querydatetime
    ORDER BY e.key, e.querydatetime
""")
cleanedFlightsDf = cleanedFlightsDf \
    .withColumn("created_day", date_format(cleanedFlightsDf["querydatetime"], "EEEE")) \
    .withColumn("created_month", date_format(cleanedFlightsDf["querydatetime"], "yyyy-MM")) \
    .withColumn("created_month_m", date_format(cleanedFlightsDf["querydatetime"], "M").cast("int")) \
    .withColumn("created_week", date_format(cleanedFlightsDf["querydatetime"], "w").cast("int")) \
    .withColumn("out_day", date_format(cleanedFlightsDf["outdeparture"], "EEE")) \
    .withColumn("out_month", date_format(cleanedFlightsDf["outdeparture"], "yyyy-MM")) \
    .withColumn("out_month_m", date_format(cleanedFlightsDf["outdeparture"], "M").cast("int")) \
    .withColumn("out_week", date_format(cleanedFlightsDf["outdeparture"], "w").cast("int")) \
    .withColumn("out_departure_interval", getIntervalUdf(date_format(cleanedFlightsDf["outdeparture"], "H").cast("int"), lit(0), lit(24), lit(4))) \
    .withColumn("out_hour", date_format(cleanedFlightsDf["outdeparture"], "k").cast("int")) \
    .withColumn("in_day", date_format(cleanedFlightsDf["indeparture"], "EEE")) \
    .withColumn("in_month", date_format(cleanedFlightsDf["indeparture"], "yyyy-MM")) \
    .withColumn("in_month_m", date_format(cleanedFlightsDf["indeparture"], "M").cast("int")) \
    .withColumn("in_week", date_format(cleanedFlightsDf["indeparture"], "w").cast("int")) \
    .withColumn("in_departure_interval", getIntervalUdf(date_format(cleanedFlightsDf["indeparture"], "H").cast("int"), lit(0), lit(24), lit(4))) \
    .withColumn("in_hour", date_format(cleanedFlightsDf["indeparture"], "k").cast("int"))

print("===LOG:WRITING_CLEANED===")
cleanedFlightsDf \
    .repartition("countryName", "querydatetime") \
    .write \
    .mode("overwrite") \
    .partitionBy(["countryName", "querydatetime"]) \
    .parquet("s3://xxx-glue/cleanedFlights")
print("===LOG:DONE_WRITING_CLEANED===")

print("===LOG:DONE BATCH %s" % (batch))

job.commit()
like image 968
Jiew Meng Avatar asked Feb 06 '19 03:02

Jiew Meng


People also ask

How do I turn off broadcast join in spark?

If we do not want broadcast join to take place, we can disable by setting: "spark. sql. autoBroadcastJoinThreshold" to "-1".

What is autoBroadcastJoinThreshold in spark?

spark.sql.adaptive.autoBroadcastJoinThreshold. (none) Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. The default value is same with spark.

How do you persist in Pyspark?

When you persist a dataset, each node stores its partitioned data in memory and reuses them in other actions on that dataset. And Spark's persisted data on nodes are fault-tolerant meaning if any partition of a Dataset is lost, it will automatically be recomputed using the original transformations that created it.


1 Answers

The weakest point of your code is the following:

LIMIT 5000000

if you take a careful look a the execution plan

: +- *GlobalLimit 5000000
: +- Exchange SinglePartition
: +- *LocalLimit 5000000

you'll see that the implementation uses two-step process, where partial limits are collected to a single partition. Which such larger number (LIMIT is simply not designed with such scenario in mind) you can easily overwhelm corresponding executor.

Additionally LIMIT in your code is redundant, since you follow it by .sample(False, 0.001).

I'd recommend dropping the LIMIT clause, and adjusting fraction accordingly:

result_full = spark.sql("""
    SELECT 
        f.*, 
        CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key,
        countryName, cityName, airportName, a.name AS agentName
    FROM flights f
    INNER JOIN agents a
    ON f.agent = a.id
    INNER JOIN airports p
    ON f.querydestinationplace = p.airportId
""")

desired_size = (5000000 * 0.001)
fraction = desired_size / result_full .count()
assert 1 < fraction < 0  

result_sample = result_full.sample(False, fraction)

Additionally I'd recommend rewriting generate_date_series

from pyspark.sql.functions import lit
from pyspark.sql import SparkSession


def generate_date_series(start, stop):
    span = (stop - start).days + 1
    return (SparkSession.builder.getOrCreate()
               .range(0, span)
               .withColumn("start", lit(start))
               .selectExpr("date_add(start, id) AS querydatetime"))


(generate_date_series(start, seven_days_ago)
    .createOrReplaceTempView("relaventDates"))

Finally I'd strongly recommend replacing getInterval UDF with composition of built-in functions* (unused arguments preserved as-is):

from pyspark.sql.functions import concat, floor
from pyspark.sql.functions import Column


def get_interval(num, start, stop, incr):
    assert isinstance(num, Column)

    lower = floor(num / incr).cast("integer") * incr
    upper = lower + incr
    return concat(lit("("), lower, lit(","), upper, lit(")"))

which could be later used as direct replacement of UDF, though it is unlikely to contribute directly to your current problems.

from pyspark.sql.functions import hour

...
    .withColumn(
        "out_departure_interval",
        get_interval(hour("outdeparture"), 0, 24, 4))

On a side note UDFRegistration.register returns callable object for a couple of releases now, so you might be able to replace

spark.udf.register("getInterval", getInterval, StringType())
getIntervalUdf = udf(getInterval)

with

getIntervalUdf = spark.udf.register("getInterval", getInterval, StringType())

* You can also consider bucketing using dedicated window function:

Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).

like image 191
user10938362 Avatar answered Sep 28 '22 02:09

user10938362