What is the way to pass parameter into dependent tasks in Airflow? I have a lot of bashes files, and i'm trying to migrate this approach to airflow, but i don't know how to pass some properties between tasks.
This is a real example:
#sqoop bash template sqoop_template = """ sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/ """ s3_template = """ s3-dist-cp --src= {{params.dir}} --dest={{params.s3}} """ #Task of extraction in EMR t1 = BashOperator( task_id='extract_account', bash_command=sqoop_template, params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")}, dag=dag) #Task to upload in s3 backup. t2 = BashOperator( task_id='s3_upload', bash_command=s3_template, params={}, #here i need the dir name created in t1 depends_on_past=True ) t2.set_upstream(t1)
In t2 i need to access the dir name created in t1.
#Execute a valid job sqoop def sqoop_import(table_name, job_name): s3, hdfs = dirpath(table_name) sqoop_job = job_default_config(job_name, hdfs) #call(sqoop_job) return {'hdfs_dir': hdfs, 's3_dir': s3} def s3_upload(**context): hdfs = context['task_instance'].xcom_pull(task_ids='sqoop_import')['hdfs_dir'] s3 = context['task_instance'].xcom_pull(task_ids='sqoop_import')['s3_dir'] s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)] #call(s3_cpdist_job) return {'s3_dir': s3} #context['task_instance'].xcom_pull(task_ids='sqoop_import') def sns_notify(**context): s3 = context['task_instance'].xcom_pull(task_ids='distcp_s3')['s3_dir'] client = boto3.client('sns') arn = 'arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg' response = client.publish(TargetArn=arn, Message=s3) return response
That's not is the definitive solution, so improvements are welcome. Thanks.
The best way to manage this use case is to use intermediary data storage. This means saving your data to some system external to Airflow at the end of one task, then reading it in from that system in the next task.
This post has shown how to create those dependencies even if you don't control the upstream DAGs: add a new DAG that relies on using the ExternalTaskSensor (one sensor per upstream DAG), encode the dependencies between the DAGs as dependencies between the sensor tasks, run the DAG encoding the dependencies in the same ...
Airflow's dynamic task mapping feature is built off of the MapReduce programming model. The map procedure takes a set of inputs and creates a single task for each one. The reduce procedure, which is optional, allows a task to operate on the collected output of a mapped task.
The Airflow BashOperator does exactly what you are looking for. It is a very simple but powerful operator, allowing you to execute either a bash script, a command or a set of commands from your DAGs.
Check out XComs - http://airflow.incubator.apache.org/concepts.html#xcoms. These are used for communicating state between tasks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With