I do not seem to understand how to import modules into an apache airflow DAG definition file. I would want to do this to be able to create a library which makes declaring tasks with similar settings less verbose, for instance.
Here is the simplest example I can think of that replicates the issue: I modified the airflow tutorial (https://airflow.apache.org/tutorial.html#recap) to simply import a module and run a definition from that module. Like so:
Directory structure:
- dags/ -- __init__.py -- lib.py -- tutorial.py
tutorial.py:
""" Code that goes along with the Airflow located at: http://airflow.readthedocs.org/en/latest/tutorial.html """ from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # Here is my added import from lib import print_double # And my usage of the imported def print_double(2) ## -- snip, because this is just the tutorial code, ## i.e., some standard DAG defintion stuff --
print_double
is just a simple def which multiplies whatever input you give it by 2, and prints the result, but obviously that doesn't even matter because this is an import issue.
I am able to run airflow test tutorial print_date 2015-06-01
as per the tutorial docs successfully - the dag runs, and moreover the print_double succeeds. 4
is printed to the console, as expected. All appears well.
Then I go the web UI, and am greeted by Broken DAG: [/home/airflow/airflow/dags/tutorial.py] No module named 'lib'
. Unpausing the dag and attempting a manual run using the UI causes a "running" status, but it never succeeds or fails. It just sits on "running" forever. I can queue up as many as I'd like, but they'll all just sit on "running" status.
I've checked the airflow logs, and don't see any useful debug information there.
So what am I missing?
You can do it in one of those ways: add your modules to one of the folders that Airflow automatically adds to PYTHONPATH. add extra folders where you keep your code to PYTHONPATH. package your code into a Python package and install it together with Airflow.
When creating a new DAG, you probably want to set a global start_date for your tasks. This can be done by declaring your start_date directly in the DAG() object. The first DagRun to be created will be based on the min(start_date) for all your tasks.
Adding the sys path again worked for me,
import sys sys.path.insert(0,os.path.abspath(os.path.dirname(__file__)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With