Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can I get() or xcom.pull() a variable in the MAIN part of an Airflow script (outside a PythonOperator)?

Tags:

python

airflow

I have a situation where I need to find a specific folder in S3 to pass onto a PythonOperator in an Airflow script. I am doing this using another PythonOperator that finds the correct directory. I can successfully either xcom.push() or Variable.set() and read it back within the PythonOperator. The problem is, I need to pass this variable onto a separate PythonOperator that uses code in a python library. Therefore, I need to Variable.get() or xcom.pull() this variable within the main part of the Airflow script. I have searched quite a bit and can't seem to figure out if this is possible or not. Below is some code for reference:

    def check_for_done_file(**kwargs):

    ### This function does a bunch of stuff to find the correct S3 path to 
    ### populate target_dir, this has been verified and works

    Variable.set("target_dir", done_file_list.pop())
    test = Variable.get("target_dir")
    print("TEST: ", test)

    #### END OF METHOD, BEGIN MAIN

with my_dag:

   ### CALLING METHOD FROM MAIN, POPULATING VARIABLE

   check_for_done_file_task = PythonOperator(
      task_id = 'check_for_done_file',
      python_callable = check_for_done_file,
      dag = my_dag,
      op_kwargs = {
          "source_bucket" : "my_source_bucket",
          "source_path" : "path/to/the/s3/folder/I/need"
      }
   )

   target_dir = Variable.get("target_dir") # I NEED THIS VAR HERE.

   move_data_to_in_progress_task = PythonOperator(
       task_id = 'move-from-incoming-to-in-progress',
       python_callable = FileOps.move, # <--- PYTHON LIBRARY THAT COPIES FILES FROM SRC TO DEST
       dag = my_dag,
       op_kwargs = {
           "source_bucket" : "source_bucket",
           "source_path" : "path/to/my/s3/folder/" + target_dir,
           "destination_bucket" : "destination_bucket",
           "destination_path" : "path/to/my/s3/folder/" + target_dir,
           "recurse" : True
       }
    )

So, is the only way to accomplish this to augment the library to look for the "target_dir" variable? I don't think Airflow main has a context, and therefore what I want to do may not be possible. Any Airflow experts, please weigh in to let me know what my options might be.

like image 866
BPS Avatar asked Nov 11 '19 14:11

BPS


1 Answers

op_kwargs is a templated field. So you can use xcom_push:

def check_for_done_file(**kwargs):
    ...
    kwargs['ti'].xcom_push(value=y)

and use jinja template in op_kwargs:

   move_data_to_in_progress_task = PythonOperator(
       task_id = 'move-from-incoming-to-in-progress',
       python_callable = FileOps.move, # <--- PYTHON LIBRARY THAT COPIES FILES FROM SRC TO DEST
       dag = my_dag,
       op_kwargs = {
           "source_bucket" : "source_bucket",
           "source_path" : "path/to/my/s3/folder/{{ ti.xcom_pull(task_ids='check_for_done_file') }}",
           "destination_bucket" : "destination_bucket",
           "destination_path" : "path/to/my/s3/folder/{{ ti.xcom_pull(task_ids='check_for_done_file') }}",
           "recurse" : True
       }
    )

Also, add provide_context=True to your check_for_done_file_task task to pass context dictionary to callables.

like image 162
kaxil Avatar answered Oct 15 '22 12:10

kaxil