I have JSON data that I am reading into a data frame with several fields, repartitioning it based on two columns, and converting to Pandas.
This job keeps failing on EMR on just 600,000 rows of data with some obscure errors. I have also increased memory settings of the spark driver, and still don't see any resolution.
Here is my pyspark code:
enhDataDf = (
sqlContext
.read.json(sys.argv[1])
)
enhDataDf = (
enhDataDf
.repartition('column1', 'column2')
.toPandas()
)
enhDataDf = sqlContext.createDataFrame(enhDataDf)
enhDataDf = (
enhDataDf
.toJSON()
.saveAsTextFile(sys.argv[2])
)
My spark settings are as follows:
conf = SparkConf().setAppName('myapp1')
conf.set('spark.yarn.executor.memoryOverhead', 8192)
conf.set('spark.executor.memory', 8192)
conf.set('spark.driver.memory', 8192)
sc = SparkContext(conf=conf)
The errors I get are:
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:11 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:11 ERROR ApplicationMaster: User application exited with status 143
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:56 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:56 ERROR ApplicationMaster: User application exited with status 143
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
The code works fine on up to about 600,000 JSON lines - even though there is ton of memory available. Then, it keeps failing.
Any thoughts on what is going on and how to debug / fix this problem?
I believe that the problem comes from the following part of your code:
enhDataDf = (
enhDataDf
.repartition('column1', 'column2')
.toPandas()
)
.toPandas()
collects data, so when number of records grows, it will result in the driver failure.
According to your comment this the exact pipeline you use. It means that a whole stage is not only obsolete but also incorrect. When data is collected and further parallelized there is guarantee that partitioning created by
.repartition('column1', 'column2')
will be preserved when you re-create Spark DataFrame
:
sqlContext.createDataFrame(enhDataDf)
If you want to write data by column you can do it directly:
(sqlContext
.read.json(sys.argv[1])
.repartition('column1', 'column2')
.write
.json(sys.argv[2]))
skipping intermediate toPandas
and conversion to RDD.
Following your comments:
If toPandas
serves a purpose then it will always stay a limiting factor in the pipeline and the only direct solution is to scale-up the driver node. Depending on the exact algorithms you apply on the collected data you may consider alternative options:
This remind me of mine Spark – Container exited with a non-zero exit code 143, where I was lanching a PySpark job in cluster mode, which indicates a memory issue with your application.
First try to determine which machines are failing, the driver or the executor(s), and thus be able to target your moves better - from what I read, it should be the executors.
I can see you have already set the memoryOverhead
configuration, good. Now let's focus on the memory
configuration:
...running Python with Spark (PySPark), so all the code of mine runs off the heap. For that reason, I have to allocate “not much” memory (since this will cut the memory I am allowed to use from the total memory; i.e. that if the total memory I am allowed to use is 20G and I am requesting 12G, then 8G will be left for my Python application to use.
So try to decrease that attribute, yes decrease it!
Next target: #cores!
Decrease that too, for example if you are using 8, then use 4 in the executors and/or the driver:
spark.executor.cores 4
spark.driver.cores 4
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With