Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

running pyspark script on EMR

Tags:

I currently automate my Apache Spark Pyspark scripts using clusters of EC2s using Sparks preconfigured ./ec2 directory. For automation and scheduling purposes, I would like to use Boto EMR module to send scripts up to the cluster.

I was able to bootstrap and install Spark on a cluster of EMRs. I am also able to launch a script on EMR by using my local machine's version of pyspark, and setting master like such:

$: MASTER=spark://<insert EMR master node of cluster here> ./bin/pyspark <myscriptname.py> 

However, this requires me to run that script locally, and thus I am not able to fully leverage Boto's ability to 1) start the cluster 2) add the script steps and 3) stop the cluster. I've found examples using script-runner.sh and emr "step" commands for spark-shell (scala), but I assume there is an easier way to do this with the Python module (pyspark). Thanks so much in advance!

like image 328
Matt Avatar asked Apr 25 '14 20:04

Matt


People also ask

Can we run PySpark on EMR?

You can use AWS Step Functions to run PySpark applications as EMR Steps on an existing EMR cluster. Using Step Functions, we can also create the cluster, run multiple EMR Steps sequentially or in parallel, and finally, auto-terminate the cluster.

What is PySpark EMR?

PDFRSS. Apache Spark is a distributed processing framework and programming model that helps you do machine learning, stream processing, or graph analytics using Amazon EMR clusters. Similar to Apache Hadoop, Spark is an open-source, distributed processing system commonly used for big data workloads.


2 Answers

Here is a great example of how it needs to be configured. Browse to "A quick example" for Python code.

However, in order to make things working in emr-4.7.2, a few tweaks had to be made, so here is a AWS CLI command that worked for me:

aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,s3a://your-source-bucket/code/pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE 

And here is a contents of pythonjob.py file:

from __future__ import print_function from pyspark import SparkContext import sys if __name__ == "__main__":     if len(sys.argv) != 3:         print("Usage: testjob  ", file=sys.stderr)         exit(-1)     sc = SparkContext(appName="MyTestJob")     dataTextAll = sc.textFile(sys.argv[1])     dataRDD = dataTextAll.map(lambda x: x.split(",")).map(lambda y: (str(y[0]), float(y[1]))).reduceByKey(lambda a, b: a + b)     dataRDD.saveAsTextFile(sys.argv[2])     sc.stop() 

It reads the data.csv file from S3, splits every row, converts first value to string and a second to float, groups by first value and sums the values in the second column, and writes the result back to S3.

A few comments:

  • I've decided to leave spark.yarn.submit.waitAppCompletion=true so that I can monitor job execution in console.
  • Input and output paths (sys.argv[1] and sys.argv[2] respectively) are passed to the script as part of the job sumbission (Args section in add-steps command).
  • Be aware that you must use s3a:// URI instead of s3n:// and s3:// for Hadoop 2.7+ when configuring your job.
  • If your cluster is in VPC, you need to create a VPC Endpoint for Amazon S3 if you intend to read/write from there in your EMR jobs.
like image 61
Dmitry Deryabin Avatar answered Sep 30 '22 02:09

Dmitry Deryabin


This might be helpful though it does not use boto.

Use aws cli to create the cluster and add steps(spark job) to it.

1)Create the cluster:

aws emr create-cluster --name "Spark cluster" --ami-version 3.8 --applications Name=Spark --ec2-attributes KeyName=ir --log-uri s3://Path/logs --instance-type m3.xlarge  --instance-count 1 --use-default-roles  

2) add step(spark job). Note that your python script should be stored in master node(in this case it is in /home/hadoop/spark ).

aws emr add-steps --cluster-id j-xxxxxxx --steps Name=Spark,Jar=s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--deploy-mode,client,/home/hadoop/spark/myscript.py],ActionOnFailure=CONTINUE 

you can also combine two steps into one and create cluster/run job and terminate the cluster.

Few notes: 1)I have tried multiple ways to read the script from S3 but no Luck :(

so I ended up copying it using either boto or aws cli to the node. 2) since I was testing that on one node in emr the deploy mode in step is client for client you should change that to cluster.

like image 33
hamed Avatar answered Sep 30 '22 02:09

hamed