I am very new to the Big Data technologies I am attempting to work with, but have so far managed to set up sparklyr in RStudio to connect to a standalone Spark cluster. Data is stored in Cassandra, and I can successfully bring large datsets into Spark memory (cache) to run further analysis on it.
However, recently I have been having a lot of trouble bringing in one particularly large dataset into Spark memory, even though the cluster should have more than enough resources (60 cores, 200GB RAM) to handle a dataset of its size.
I thought that by limiting the data being cached to just a few select columns of interest I could overcome the issue (using the answer code from my previous query here), but it does not. What happens is the jar process on my local machine ramps up to take over up all the local RAM and CPU resources and the whole process freezes, and on the cluster executers keep getting dropped and re-added. Weirdly, this happens even when I select only 1 row for cacheing (which should make this dataset much smaller than other datasets which I have had no problem cacheing into Spark memory).
I've had a look through the logs, and these seem to be the only informative errors/warnings early on in the process:
17/03/06 11:40:27 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 33813 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) or its executor has been marked as failed.
17/03/06 11:40:27 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 8167), so marking it as still running
...
17/03/06 11:46:59 WARN TaskSetManager: Lost task 3927.3 in stage 0.0 (TID 54882, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 3863), so marking it as still running
17/03/06 11:46:59 WARN TaskSetManager: Lost task 4300.3 in stage 0.0 (TID 54667, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 14069), so marking it as still running
And then after 20min or so the whole job crashes with:
java.lang.OutOfMemoryError: GC overhead limit exceeded
I've changed my connect config to increase the heartbeat interval ( spark.executor.heartbeatInterval: '180s'
), and have seen how to increase memoryOverhead by changing settings on a yarn cluster ( using spark.yarn.executor.memoryOverhead
), but not on a standalone cluster.
In my config file, I have experimented by adding each of the following settings one at a time (none of which have worked):
spark.memory.fraction: 0.3
spark.executor.extraJavaOptions: '-Xmx24g'
spark.driver.memory: "64G"
spark.driver.extraJavaOptions: '-XX:MaxHeapSize=1024m'
spark.driver.extraJavaOptions: '-XX:+UseG1GC'
UPDATE: and my full current yml
config file is as follows:
default:
# local settings
sparklyr.sanitize.column.names: TRUE
sparklyr.cores.local: 3
sparklyr.shell.driver-memory: "8G"
# remote core/memory settings
spark.executor.memory: "32G"
spark.executor.cores: 5
spark.executor.heartbeatInterval: '180s'
spark.ext.h2o.nthreads: 10
spark.cores.max: 30
spark.memory.storageFraction: 0.6
spark.memory.fraction: 0.3
spark.network.timeout: 300
spark.driver.extraJavaOptions: '-XX:+UseG1GC'
# other configs for spark
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.executor.extraClassPath: /var/lib/cassandra/jar/guava-18.0.jar
# cassandra settings
spark.cassandra.connection.host: <cassandra_ip>
spark.cassandra.auth.username: <cassandra_login>
spark.cassandra.auth.password: <cassandra_pass>
spark.cassandra.connection.keep_alive_ms: 60000
# spark packages to load
sparklyr.defaultPackages:
- "com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M1"
- "com.databricks:spark-csv_2.11:1.3.0"
- "com.datastax.cassandra:cassandra-driver-core:3.0.2"
- "com.amazonaws:aws-java-sdk-pom:1.10.34"
So my question are:
OK, I've finally managed to make this work!
I'd initially tried the suggestion of @user6910411 to decrease the cassandra input split size, but this failed in the same way. After playing around with LOTS of other things, today I tried changing that setting in the opposite direction:
spark.cassandra.input.split.size_in_mb: 254
By INCREASING the split size, there were fewer spark tasks, and thus less overhead and fewer calls to the GC. It worked!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With