Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you use the Google DataProc Java Client to submit spark jobs using jar files and classes in associated GS bucket?

I need to trigger Spark Jobs to aggregate data from a JSON file using an API call. I use spring-boot to create the resources. Thus, the steps for the solution is the following:

  1. User makes an POST request with a json file as the input
  2. The JSON file is stored in google bucket associated with dataproc cluster.
  3. A aggregating spark job is triggered from within the REST method with the specified jars, classes and the argument is the json file link.

I want the job to be triggered using Dataproc's Java Client instead of console or command line. How do you do it?

like image 552
pashupati Avatar asked Feb 24 '16 19:02

pashupati


1 Answers

We're hoping to have a more thorough guide shortly on the official documentation, but to get started, visit the following API overview: https://developers.google.com/api-client-library/java/apis/dataproc/v1

It includes links to the Dataproc javadocs; if your server is making calls on behalf of your own project and not on behalf of your end-users' Google projects, then you probably want the keyfile-based service-account auth explained here to create the Credential object you use to initialize the Dataproc client stub.

As for the dataproc-specific parts, this just means you add the following dependency to your Maven pomfile if using Maven:

<project>
  <dependencies>
    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-dataproc</artifactId>
      <version>v1-rev4-1.21.0</version>
    </dependency>
  </dependencies>
</project>

And then you'll have code like:

Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)
    .setApplicationName("my-webabb/1.0")
    .build();
dataproc.projects().regions().jobs().submit(
    projectId, "global", new SubmitJobRequest()
        .setJob(new Job()
            .setPlacement(new JobPlacement()
                .setClusterName("my-spark-cluster"))
            .setSparkJob(new SparkJob()
                .setMainClass("FooSparkJobMain")
                .setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar"))
                .setArgs(ImmutableList.of(
                    "arg1", "arg2", "arg3")))))
    .execute();

Since different intermediary servers may do low-level retries or your request may throw an IOException where you don't know whether the job-submission succeeded or not, an addition step you may want to take is to generate your own jobId; then you know what jobId to poll on to figure out if it got submitted even if your request times out or throws some unknown exception:

import java.util.UUID;

...

Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)
    .setApplicationName("my-webabb/1.0")
    .build();

String curJobId = "json-agg-job-" + UUID.randomUUID().toString();
Job jobSnapshot = null;
try {
  jobSnapshot = dataproc.projects().regions().jobs().submit(
      projectId, "global", new SubmitJobRequest()
          .setJob(new Job()
              .setReference(new JobReference()
                   .setJobId(curJobId))
              .setPlacement(new JobPlacement()
                  .setClusterName("my-spark-cluster"))
              .setSparkJob(new SparkJob()
                  .setMainClass("FooSparkJobMain")
                  .setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar"))
                  .setArgs(ImmutableList.of(
                      "arg1", "arg2", "arg3")))))
      .execute();
} catch (IOException ioe) {
  try {
    jobSnapshot = dataproc.projects().regions().jobs().get(
        projectId, "global", curJobId).execute();
    logger.info(ioe, "Despite exception, job was verified submitted");
  } catch (IOException ioe2) {
    // Handle differently; if it's a GoogleJsonResponseException you can inspect the error
    // code, and if it's a 404, then it means the job didn't get submitted; you can add retry
    // logic in that case.
  }
}

// We can poll on dataproc.projects().regions().jobs().get(...) until the job reports being
// completed or failed now.
like image 84
Dennis Huo Avatar answered Oct 02 '22 01:10

Dennis Huo