Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I wait for completion of an Elastic MapReduce job flow in a Java application?

Recently I've been working with Amazon Web Services (AWS) and I've noticed there is not much documentation on the subject, so I added my solution.

I was writing an application using Amazon Elastic MapReduce (Amazon EMR). After the calculations ended I needed to perform some work on the files created by them, so I needed to know when the job flow completed its work.

This is how you can check if your job flow completed:

AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials);

DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowsRequest()
    .withJobFlowStates("COMPLETED");

List<JobFlowDetail> jobs = mapReduce.describeJobFlows(jobAttributes).getJobFlows();
JobFlowDetail detail = jobs.get(0);

detail.getJobFlowId(); //the id of one of the completed jobs

You can also look for a specific job id in DescribeJobFlowsRequest and then to check if that job has finished of failed.

I hope it will help others.

like image 238
siditom Avatar asked May 25 '12 16:05

siditom


People also ask

When would you use elastic MapReduce EMR?

Amazon Elastic MapReduce is one of the many services that AWS offers. It enables users to launch and use resizable Hadoop clusters within Amazon's infrastructure. Like Hadoop, Amazon EMR can be used to analyze vast data sets.

What is command runner jar?

command-runner.jar. Located on the Amazon EMR AMI for your cluster. You can use command-runner. jar to run commands on your cluster.

What does elastic MapReduce do?

Amazon EMR (previously called Amazon Elastic MapReduce) is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark , on AWS to process and analyze vast amounts of data.

What is EMR serverless?

Amazon EMR Serverless is a serverless option in Amazon EMR that makes it easy for data analysts and engineers to run open-source big data analytics frameworks without configuring, managing, and scaling clusters or servers.


1 Answers

I also ran into this problem, and here's the solution I came up with for now. It's not perfect, but hopefully it'll be helpful. For reference, I'm using Java 1.7 and AWS Java SDK version 1.9.13.

Note that this code assumes that you're waiting for the cluster to terminate, not the steps strictly speaking; if your cluster terminates when all your steps are done this is alright, but if you're using clusters that stay alive after step completion this won't help you too much.

Also, note that this code monitors and logs cluster state changes, and in addition diagnoses whether the cluster terminated with errors and throws an exception if it did.

private void yourMainMethod() {
    RunJobFlowRequest request = ...;

    try {
        RunJobFlowResult submission = emr.runJobFlow(request);
        String jobFlowId = submission.getJobFlowId();
        log.info("Submitted EMR job as job flow id {}", jobFlowId);

        DescribeClusterResult result = 
            waitForCompletion(emr, jobFlowId, 90, TimeUnit.SECONDS);
        diagnoseClusterResult(result, jobFlowId);
    } finally {
        emr.shutdown();
    }
}

private DescribeClusterResult waitForCompletion(
             AmazonElasticMapReduceClient emr, String jobFlowId,
             long sleepTime, TimeUnit timeUnit)
        throws InterruptedException {
    String state = "STARTING";
    while (true) {
        DescribeClusterResult result = emr.describeCluster(
                new DescribeClusterRequest().withClusterId(jobFlowId)
        );
        ClusterStatus status = result.getCluster().getStatus();
        String newState = status.getState();
        if (!state.equals(newState)) {
            log.info("Cluster id {} switched from {} to {}.  Reason: {}.",
                     jobFlowId, state, newState, status.getStateChangeReason());
            state = newState;
        }

        switch (state) {
            case "TERMINATED":
            case "TERMINATED_WITH_ERRORS":
            case "WAITING":
                return result;
        }

        timeUnit.sleep(sleepTime);
    }
}

private void diagnoseClusterResult(DescribeClusterResult result, String jobFlowId) {
    ClusterStatus status = result.getCluster().getStatus();
    ClusterStateChangeReason reason = status.getStateChangeReason();
    ClusterStateChangeReasonCode code = 
        ClusterStateChangeReasonCode.fromValue(reason.getCode());
    switch (code) {
    case ALL_STEPS_COMPLETED:
        log.info("Completed EMR job {}", jobFlowId);
        break;
    default:
        failEMR(jobFlowId, status);
    }
}

private static void failEMR(String jobFlowId, ClusterStatus status) {
    String msg = "EMR cluster run %s terminated with errors.  ClusterStatus = %s";
    throw new RuntimeException(String.format(msg, jobFlowId, status));
}
like image 77
Luis Casillas Avatar answered Oct 13 '22 01:10

Luis Casillas