Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow pass parameters to dependent task

Tags:

bash

airflow

What is the way to pass parameter into dependent tasks in Airflow? I have a lot of bashes files, and i'm trying to migrate this approach to airflow, but i don't know how to pass some properties between tasks.

This is a real example:

#sqoop bash template sqoop_template = """         sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/     """  s3_template = """         s3-dist-cp --src= {{params.dir}} --dest={{params.s3}}     """    #Task of extraction in EMR t1 = BashOperator(         task_id='extract_account',          bash_command=sqoop_template,          params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")},         dag=dag) #Task to upload in s3 backup. t2 = BashOperator(         task_id='s3_upload',         bash_command=s3_template,         params={}, #here i need the dir name created in t1         depends_on_past=True     )  t2.set_upstream(t1) 

In t2 i need to access the dir name created in t1.

Solution

#Execute a valid job sqoop def sqoop_import(table_name, job_name):     s3, hdfs = dirpath(table_name)     sqoop_job = job_default_config(job_name, hdfs)     #call(sqoop_job)     return {'hdfs_dir': hdfs, 's3_dir': s3}  def s3_upload(**context):     hdfs = context['task_instance'].xcom_pull(task_ids='sqoop_import')['hdfs_dir']     s3 = context['task_instance'].xcom_pull(task_ids='sqoop_import')['s3_dir']     s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)]     #call(s3_cpdist_job)     return {'s3_dir': s3} #context['task_instance'].xcom_pull(task_ids='sqoop_import')  def sns_notify(**context):     s3 = context['task_instance'].xcom_pull(task_ids='distcp_s3')['s3_dir']     client = boto3.client('sns')     arn = 'arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg'     response = client.publish(TargetArn=arn, Message=s3)     return response       

That's not is the definitive solution, so improvements are welcome. Thanks.

like image 921
Carleto Avatar asked Jul 25 '16 17:07

Carleto


People also ask

How do you pass values from one task to another in Airflow?

The best way to manage this use case is to use intermediary data storage. This means saving your data to some system external to Airflow at the end of one task, then reading it in from that system in the next task.

How do you set dependency between Airflow DAGs?

This post has shown how to create those dependencies even if you don't control the upstream DAGs: add a new DAG that relies on using the ExternalTaskSensor (one sensor per upstream DAG), encode the dependencies between the DAGs as dependencies between the sensor tasks, run the DAG encoding the dependencies in the same ...

How do I create a dynamic task in Airflow?

Airflow's dynamic task mapping feature is built off of the MapReduce programming model. The map procedure takes a set of inputs and creates a single task for each one. The reduce procedure, which is optional, allows a task to operate on the collected output of a mapped task.

What does BashOperator mean in Airflow DAG?

The Airflow BashOperator does exactly what you are looking for. It is a very simple but powerful operator, allowing you to execute either a bash script, a command or a set of commands from your DAGs.


1 Answers

Check out XComs - http://airflow.incubator.apache.org/concepts.html#xcoms. These are used for communicating state between tasks.

like image 69
Vineet Goel Avatar answered Oct 02 '22 17:10

Vineet Goel