Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you automate pyspark jobs on emr using boto3 (or otherwise)?

I am creating a job to parse massive amounts of server data, and then upload it into a Redshift database.

My job flow is as follows:

  • Grab the log data from S3
  • Either use spark dataframes or spark sql to parse the data and write back out to S3
  • Upload the data from S3 to Redshift.

I'm getting hung up on how to automate this though so that my process spins up an EMR cluster, bootstraps the correct programs for installation, and runs my python script that will contain the code for parsing and writing.

Does anyone have any examples, tutorials, or experience they could share with me to help me learn how to do this?

like image 933
flybonzai Avatar asked Apr 19 '16 00:04

flybonzai


People also ask

Can we run PySpark on EMR?

You can use AWS Step Functions to run PySpark applications as EMR Steps on an existing EMR cluster. Using Step Functions, we can also create the cluster, run multiple EMR Steps sequentially or in parallel, and finally, auto-terminate the cluster.


2 Answers

Take a look at boto3 EMR docs to create the cluster. You essentially have to call run_job_flow and create steps that runs the program you want.

import boto3    

client = boto3.client('emr', region_name='us-east-1')

S3_BUCKET = 'MyS3Bucket'
S3_KEY = 'spark/main.py'
S3_URI = 's3://{bucket}/{key}'.format(bucket=S3_BUCKET, key=S3_KEY)

# upload file to an S3 bucket
s3 = boto3.resource('s3')
s3.meta.client.upload_file("myfile.py", S3_BUCKET, S3_KEY)

response = client.run_job_flow(
    Name="My Spark Cluster",
    ReleaseLabel='emr-4.6.0',
    Instances={
        'MasterInstanceType': 'm4.xlarge',
        'SlaveInstanceType': 'm4.xlarge',
        'InstanceCount': 4,
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
    },
    Applications=[
        {
            'Name': 'Spark'
        }
    ],
    BootstrapActions=[
        {
            'Name': 'Maximize Spark Default Config',
            'ScriptBootstrapAction': {
                'Path': 's3://support.elasticmapreduce/spark/maximize-spark-default-config',
            }
        },
    ],
    Steps=[
    {
        'Name': 'Setup Debugging',
        'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['state-pusher-script']
        }
    },
    {
        'Name': 'setup - copy files',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', S3_URI, '/home/hadoop/']
        }
    },
    {
        'Name': 'Run Spark',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit', '/home/hadoop/main.py']
        }
    }
    ],
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole'
)

You can also add steps to a running cluster if you know the job flow id:

job_flow_id = response['JobFlowId']
print("Job flow ID:", job_flow_id)

step_response = client.add_job_flow_steps(JobFlowId=job_flow_id, Steps=SomeMoreSteps)

step_ids = step_response['StepIds']

print("Step IDs:", step_ids)

For more configurations, check out sparksteps.

like image 119
Kamil Sindi Avatar answered Oct 09 '22 09:10

Kamil Sindi


Just do this using AWS Data Pipeline. You can setup your S3 bucket to trigger a lambda function every time a new file is placed inside the bucket https://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html. Then your Lambda function will activate your Data Pipeline https://aws.amazon.com/blogs/big-data/using-aws-lambda-for-event-driven-data-processing-pipelines/ then your Data Pipeline spins up a new EMR Cluster using EmrCluster, then you can specify your bootstrap options, then you can run your EMR commands using EmrActivity, and when it's all done it'll terminate your EMR Cluster and deactivate the Data Pipeline.

like image 44
Kyle Bridenstine Avatar answered Oct 09 '22 08:10

Kyle Bridenstine