Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to execute spark submit on amazon EMR from Lambda function?

I want to execute spark submit job on AWS EMR cluster based on the file upload event on S3. I am using AWS Lambda function to capture the event but I have no idea how to submit spark submit job on EMR cluster from Lambda function.

Most of the answers that i searched talked about adding a step in the EMR cluster. But I do not know if I can add add any step to fire "spark submit --with args" in the added step.

like image 544
Satyam Avatar asked Aug 21 '17 11:08

Satyam


People also ask

Can we run Spark code in AWS Lambda?

The most common operations like data extraction and ingestion in the S3 data lake, loading processed data into the data stores and pushing down SQL workloads on AWS Redshift can be done easily using AWS lambda Spark.


1 Answers

You can, I had to same thing last week!

Using boto3 for Python (other languages would definitely have a similar solution) you can either start a cluster with the defined step, or attach a step to an already up cluster.

Defining the cluster with the step

def lambda_handler(event, context):
    conn = boto3.client("emr")        
    cluster_id = conn.run_job_flow(
        Name='ClusterName',
        ServiceRole='EMR_DefaultRole',
        JobFlowRole='EMR_EC2_DefaultRole',
        VisibleToAllUsers=True,
        LogUri='s3n://some-log-uri/elasticmapreduce/',
        ReleaseLabel='emr-5.8.0',
        Instances={
            'InstanceGroups': [
                {
                    'Name': 'Master nodes',
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'MASTER',
                    'InstanceType': 'm3.xlarge',
                    'InstanceCount': 1,
                },
                {
                    'Name': 'Slave nodes',
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',
                    'InstanceType': 'm3.xlarge',
                    'InstanceCount': 2,
                }
            ],
            'Ec2KeyName': 'key-name',
            'KeepJobFlowAliveWhenNoSteps': False,
            'TerminationProtected': False
        },
        Applications=[{
            'Name': 'Spark'
        }],
        Configurations=[{
            "Classification":"spark-env",
            "Properties":{},
            "Configurations":[{
                "Classification":"export",
                "Properties":{
                    "PYSPARK_PYTHON":"python35",
                    "PYSPARK_DRIVER_PYTHON":"python35"
                }
            }]
        }],
        BootstrapActions=[{
            'Name': 'Install',
            'ScriptBootstrapAction': {
                'Path': 's3://path/to/bootstrap.script'
            }
        }],
        Steps=[{
            'Name': 'StepName',
            'ActionOnFailure': 'TERMINATE_CLUSTER',
            'HadoopJarStep': {
                'Jar': 's3n://elasticmapreduce/libs/script-runner/script-runner.jar',
                'Args': [
                    "/usr/bin/spark-submit", "--deploy-mode", "cluster",
                    's3://path/to/code.file', '-i', 'input_arg', 
                    '-o', 'output_arg'
                ]
            }
        }],
    )
    return "Started cluster {}".format(cluster_id)

Attaching a step to an already running cluster

As per here

def lambda_handler(event, context):
    conn = boto3.client("emr")
    # chooses the first cluster which is Running or Waiting
    # possibly can also choose by name or already have the cluster id
    clusters = conn.list_clusters()
    # choose the correct cluster
    clusters = [c["Id"] for c in clusters["Clusters"] 
                if c["Status"]["State"] in ["RUNNING", "WAITING"]]
    if not clusters:
        sys.stderr.write("No valid clusters\n")
        sys.stderr.exit()
    # take the first relevant cluster
    cluster_id = clusters[0]
    # code location on your emr master node
    CODE_DIR = "/home/hadoop/code/"

    # spark configuration example
    step_args = ["/usr/bin/spark-submit", "--spark-conf", "your-configuration",
                 CODE_DIR + "your_file.py", '--your-parameters', 'parameters']

    step = {"Name": "what_you_do-" + time.strftime("%Y%m%d-%H:%M"),
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 's3n://elasticmapreduce/libs/script-runner/script-runner.jar',
                'Args': step_args
            }
        }
    action = conn.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step])
    return "Added step: %s"%(action)
like image 192
Osama Haggag Avatar answered Sep 22 '22 16:09

Osama Haggag