I have opened an AWS EMR cluster and in pyspark3 jupyter notebook I run this code:
"..
textRdd = sparkDF.select(textColName).rdd.flatMap(lambda x: x)
textRdd.collect().show()
.."
I got this error:
An error was encountered:
Invalid status code '400' from http://..../sessions/4/statements/7 with error payload: {"msg":"requirement failed: Session isn't active."}
Running the line:
sparkDF.show()
works!
I also created a small subset of the file and all my code runs fine.
What is the problem?
We've found great success using popular open source frameworks like Spark and MLlib to learn models at massive scale. The advantages of using these tools are further amplified by relying on AWS and EMR, specifically, to create and manage our clusters.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to create a new cluster, run the two PySpark applications, and then auto-terminate. As shown below, we see the short-lived EMR cluster in the process of terminating after successfully running the PySpark applications as EMR Steps. 4.
Alternately, for your workflows, you might prefer AWS Glue ETL Jobs, as opposed to PySpark on EMR, to perform nearly identical data processing tasks. The second set of four PySpark applications perform data analysis tasks on the data. There are two versions of each PySpark application.
With Amazon EMR version 5.21.0 and later, you can override cluster configurations and specify additional configuration classifications for each instance group in a running cluster. You do this by using the Amazon EMR console, the AWS Command Line Interface (AWS CLI), or the AWS SDK.
Configuration classifications for Spark on Amazon EMR include the following: spark —Sets the maximizeResourceAllocation property to true or false. When true, Amazon EMR automatically configures spark-defaults properties based on cluster hardware configuration. For more information, see Using maximizeResourceAllocation .
I had the same issue and the reason for the timeout is the driver running out of memory. Since you run collect()
all the data gets sent to the driver. By default the driver memory is 1000M
when creating a spark application through JupyterHub even if you set a higher value through config.json
. You can see that by executing the code from within a jupyter notebook
spark.sparkContext.getConf().get('spark.driver.memory')
1000M
To increase the driver memory just do
%%configure -f
{"driverMemory": "6000M"}
This will restart the application with increased driver memory. You might need to use higher values for your data. Hope it helps.
From This stack overflow question's answer which worked for me
Judging by the output, if your application is not finishing with a FAILED status, that sounds like a Livy timeout error: your application is likely taking longer than the defined timeout for a Livy session (which defaults to 1h), so even despite the Spark app succeeds your notebook will receive this error if the app takes longer than the Livy session's timeout.
If that's the case, here's how to address it:
1. edit the /etc/livy/conf/livy.conf file (in the cluster's master node)
2. set the livy.server.session.timeout to a higher value, like 8h (or larger, depending on your app)
3. restart Livy to update the setting: sudo restart livy-server in the cluster's master
4. test your code again
Alternative way to edit this setting - https://allinonescript.com/questions/54220381/how-to-set-livy-server-session-timeout-on-emr-cluster-boostrap
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With