Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Azkaban: pass parameters to underlying job code

Tags:

azkaban

Is it possible to pass options from a azkaban work flow to the underlying job code?

I have something like this, It kind of works for hard coded/pre-known dates but I would like to have the option to specify the date when I execute the flow:

from azkaban import Job, Project
import datetime
import os
from datetime import datetime, timezone, timedelta




options = {
            'start.date' : today.strftime('%Y-%m-%d'), # Can we pass this as an argument to the underlying code?
            'day.offset' : 1
            }

project = Project('my_project',root=__file__)
project.add_file('my_shell_script.sh', 'my_shell_script.sh')
project.add_job('my_job', Job(options, {'type' : 'command' : 'bash my_shell_script <pass date here?>'}))
project.add_job('my_job', Job(options, {'type' : 'command' : 'java -jar test.jar <pass date here?>'}))

Thanks, Sharath

like image 304
sharath Avatar asked May 07 '15 23:05

sharath


3 Answers

Big Picture: make the parameter persistent by writing it to disk

One way to pass parameters between nonadjacent jobs in an Azkaban flow is to operate on the JOB_OUTPUT_PROP_FILE just before you need the parameter. It is necessary to do this using a shell script because the JOB_OUTPUT_PROP_FILE variable is not directly available to a given job. This approach writes the relevant information to a file and read it just before it is needed using a helper script. Parameters can be passed to adjacent jobs by writing to the JOB_OUTPUT_PROP_FILE at each step.

Pass parameters between jobs diagram


Concept

  1. write a file to disk that contains the information you wish to pass
  2. create a helper/preparation shell script that runs just before the job that needs the parameter
  3. use the parameter in your job

Example

In a scenario where the date when the first job in a flow was run needs to be used by a latter job, first write the relevant data to file. In this example, the current date in YYYY-MM-DD format is written to a local file called rundate.text

#step_1.job
type=command
dependencies=initialize_jobs
command=whoami
command.1=/bin/sh -c "date '+%Y-%m-%d' > rundate.text"

Then, just before the parameter is needed, run a prep script to make the parameter available.

#step_4_preparation.job
type=command
dependencies=step_3
command=whoami
command.1=/bin/bash rd.sh

step 4 preparation executes the following shell script (rd.sh)

#!/bin/sh
# this script takes the run_date value from the text file and passes it to Azkaban
# Now, the variable can be used in the next step of the job

RD=$(cat rundate.text)

echo "Now setting Job Output Property File with RD (run date) variable"
echo $RD


#This is the point where the parameter is written in JSON format
#to the JOB_OUTPUT_PROP_FILE, which allows it to be used in the next step
echo '{"RD" : "'"$RD"'"}' > "${JOB_OUTPUT_PROP_FILE}"

Then, in the following step, the parameter can be used, which is ${RD} in this example.

# step_4.job
type=command
dependencies=step_4_preparation
command=whoami
command.1=bash -c "echo ${RD} is the run date"
like image 63
eeasterly Avatar answered Nov 18 '22 06:11

eeasterly


Well,

As per azkaban doc's only global flow properties can be overridden. In python, we can set global properties in this way:

    project = Project('emr-cluster-creation', root=__file__)

project.properties = {
                                                    'emr.cluster.name' : 'default-clustername',
                                                    'emr.master.instance.type' : 'r3.xlarge',
                                                    'emr.core.instance.type' : 'r3.xlarge',
                                                    'emr.task.instance.type' : 'r3.xlarge',
                                                    'emr.instance.count' : 11,
                                                    'emr.task.instance.count' : 5,
                                                    'emr.hadoop.conf.local.path' : 's3n://bucket/hadoop-configuration.json',
                                                    'emr.hive.site.s3.path' : 's3n://bucket/hive-site.xml',
                                                    'emr.spark.version' : '1.4.0.b',
#                                                   'emr.service.role' : 'some-role', #amazon IAM role.
                                                    'emr.service.role' : '\'\'', #amazon IAM role.
                                                    'emr.cluster.output' : '\'\''
    }

# do something...

These parameters could be passed to the underlying app/scripts as ${emr.cluster.name}. This would support both default property values to be passed and the override the flow parameters either on azkaban server web-ui or using azkaban ajax API.

like image 34
sharath Avatar answered Nov 18 '22 06:11

sharath


As eeasterly said the correct way is to use the JOB_OUTPUT_PROP_FILE but instead of persisting it to the file system, I believe it is better to use the trait that it is passed to all its dependencies (Creating Flows > Job configuration > Parameter Output > "Properties can be exported to be passed to its dependencies").

To leverage this feature just make the jobs that need the exported parameters dependencies of the job that exports them. In eeasterly's case discard the intermediate step 4 preparation and just make the step 4 dependant on step 1.

like image 1
Péťa Poliak Avatar answered Nov 18 '22 08:11

Péťa Poliak