airflow_version = 1.10.2; python_version = 3.6.8
I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.
So, the following works:
def my_function(temp_file, task_id, **kwargs):
xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)
if not xcom_vals:
return 'Xcom message not retrieved'
ack_messages = []
for item in xcom_vals:
ack_messages += <do stuff>
return ack_messages
with DAG(<dag args>):
process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs={'task_id': 'previous_task_id',
'temp_file': temp_file},
provide_context=True,
)
But, moving my_function to a module lib/helpers.py and then importing it fails with error.
Broken DAG: [path to dag] cannot import my_function
NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.
How should my_function be implemented so that it is callable by other dags?
Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.
What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With