Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow python callable function reusable

Tags:

python

airflow

airflow_version = 1.10.2; python_version = 3.6.8

I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.

So, the following works:

def my_function(temp_file, task_id, **kwargs):

    xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)

    if not xcom_vals:
        return 'Xcom message not retrieved'

    ack_messages = []

    for item in xcom_vals:

        ack_messages += <do stuff>

    return ack_messages

with DAG(<dag args>):

    process_messages = PythonOperator(
        task_id='get_messages',
        python_callable=my_function,
        op_kwargs={'task_id': 'previous_task_id',
                    'temp_file': temp_file},
        provide_context=True,
    )

But, moving my_function to a module lib/helpers.py and then importing it fails with error.

Broken DAG: [path to dag] cannot import my_function

NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.

How should my_function be implemented so that it is callable by other dags?

like image 686
Miguel Avatar asked May 29 '26 13:05

Miguel


1 Answers

Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.

What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.

like image 58
Miguel Avatar answered Jun 01 '26 01:06

Miguel



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!