Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Launching Cloud Dataflow from Cloud Functions

How do I launch a Cloud Dataflow job from a Google Cloud Function? I'd like to use Google Cloud Functions as a mechanism to enable cross-service composition.

like image 550
Sam McVeety Avatar asked Feb 15 '16 17:02

Sam McVeety


Video Answer


2 Answers

I've included a very basic example of the WordCount sample below. Please note that you'll need to include a copy of the java binary in your Cloud Function deployment, since it is not in the default environment. Likewise, you'll need to package your deploy jar with your Cloud Function as well.

module.exports = {
  wordcount: function (context, data) {
    const spawn = require('child_process').spawn;
    const child = spawn(
            'jre1.8.0_73/bin/java',
            ['-cp',
             'MY_JAR.jar',
             'com.google.cloud.dataflow.examples.WordCount',
             '--jobName=fromACloudFunction',
             '--project=MY_PROJECT',
             '--runner=BlockingDataflowPipelineRunner',
             '--stagingLocation=gs://STAGING_LOCATION',
             '--inputFile=gs://dataflow-samples/shakespeare/*',
             '--output=gs://OUTPUT_LOCATION'
            ],
            { cwd: __dirname });

    child.stdout.on('data', function(data) {
      console.log('stdout: ' + data);
    });
    child.stderr.on('data', function(data) {
      console.log('error: ' + data);
    });
    child.on('close', function(code) {
      console.log('closing code: ' + code);
    });
    context.success();
  }
}

You could further enhance this example by using the non-blocking runner and having the function return the Job ID, so that you can poll for job completion separately. This pattern should be valid for other SDKs as well, so long as their dependencies can be packaged into the Cloud Function.

like image 99
Sam McVeety Avatar answered Oct 24 '22 07:10

Sam McVeety


The best way is to launch is via cloud function but be careful, if you are using the cloud function for google cloud storage, then for every file uploaded a dataflow job will be launched.

const { google } = require('googleapis');

const templatePath = "gs://template_dir/df_template;
const project = "<project_id>";
const tempLoc = "gs://tempLocation/";

exports.PMKafka = (data, context, callback) => {
    const file = data;

    console.log(`Event ${context.eventId}`);
    console.log(`Event Type: ${context.eventType}`);
    console.log(`Bucket Name: ${file.bucket}`);
    console.log(`File Name: ${file.name}`);
    console.log(`Metageneration: ${file.metageneration}`);
    console.log(`Created: ${file.timeCreated}`);
    console.log(`Updated: ${file.updated}`);
    console.log(`Uploaded File Name - gs://${file.bucket}/${file.name}`);

    google.auth.getApplicationDefault(function (err, authClient, projectId) {
        if (err) {
            throw err;
        }

        if (authClient.createScopedRequired && authClient.createScopedRequired()) {
            authClient = authClient.createScoped(authScope);
        }

        const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

        var inputDict= {
            inputFile: `gs://${file.bucket}/${file.name}`,
            ...
            ...
            <other_runtime_parameters>
        };

        var env = {
            tempLocation: tempLoc
        };

        var resource_opts = {
            parameters: inputDict,
            environment: env,
            jobName: config.jobNamePrefix + "-" + new Date().toISOString().toLowerCase().replace(":","-").replace(".","-")
        };

        var opts = {
            gcsPath: templatePath,
            projectId: project,
            resource: resource_opts
        }

        console.log(`Dataflow Run Time Options - ${JSON.stringify(opts)}`)

        dataflow.projects.templates.launch(opts, function (err, response) {
            if (err) {
                console.error("problem running dataflow template, error was: ", err);
                slack.publishMessage(null, null, false, err);
                return;
            }
            console.log("Dataflow template response: ", response);
            var jobid = response["data"]["job"]["id"];
            console.log("Dataflow Job ID: ", jobid);
        });
        callback();
    });
};
like image 27
bigbounty Avatar answered Oct 24 '22 08:10

bigbounty