I have a situation where I need to find a specific folder in S3 to pass onto a PythonOperator in an Airflow script. I am doing this using another PythonOperator that finds the correct directory. I can successfully either xcom.push() or Variable.set() and read it back within the PythonOperator. The problem is, I need to pass this variable onto a separate PythonOperator that uses code in a python library. Therefore, I need to Variable.get() or xcom.pull() this variable within the main part of the Airflow script. I have searched quite a bit and can't seem to figure out if this is possible or not. Below is some code for reference:
def check_for_done_file(**kwargs):
### This function does a bunch of stuff to find the correct S3 path to
### populate target_dir, this has been verified and works
Variable.set("target_dir", done_file_list.pop())
test = Variable.get("target_dir")
print("TEST: ", test)
#### END OF METHOD, BEGIN MAIN
with my_dag:
### CALLING METHOD FROM MAIN, POPULATING VARIABLE
check_for_done_file_task = PythonOperator(
task_id = 'check_for_done_file',
python_callable = check_for_done_file,
dag = my_dag,
op_kwargs = {
"source_bucket" : "my_source_bucket",
"source_path" : "path/to/the/s3/folder/I/need"
}
)
target_dir = Variable.get("target_dir") # I NEED THIS VAR HERE.
move_data_to_in_progress_task = PythonOperator(
task_id = 'move-from-incoming-to-in-progress',
python_callable = FileOps.move, # <--- PYTHON LIBRARY THAT COPIES FILES FROM SRC TO DEST
dag = my_dag,
op_kwargs = {
"source_bucket" : "source_bucket",
"source_path" : "path/to/my/s3/folder/" + target_dir,
"destination_bucket" : "destination_bucket",
"destination_path" : "path/to/my/s3/folder/" + target_dir,
"recurse" : True
}
)
So, is the only way to accomplish this to augment the library to look for the "target_dir" variable? I don't think Airflow main has a context, and therefore what I want to do may not be possible. Any Airflow experts, please weigh in to let me know what my options might be.
op_kwargs
is a templated field. So you can use xcom_push
:
def check_for_done_file(**kwargs):
...
kwargs['ti'].xcom_push(value=y)
and use jinja template in op_kwargs
:
move_data_to_in_progress_task = PythonOperator(
task_id = 'move-from-incoming-to-in-progress',
python_callable = FileOps.move, # <--- PYTHON LIBRARY THAT COPIES FILES FROM SRC TO DEST
dag = my_dag,
op_kwargs = {
"source_bucket" : "source_bucket",
"source_path" : "path/to/my/s3/folder/{{ ti.xcom_pull(task_ids='check_for_done_file') }}",
"destination_bucket" : "destination_bucket",
"destination_path" : "path/to/my/s3/folder/{{ ti.xcom_pull(task_ids='check_for_done_file') }}",
"recurse" : True
}
)
Also, add provide_context=True
to your check_for_done_file_task
task to pass context dictionary to callables.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With