Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow webserver gives cron error for dags with None as schedule interval

I'm running Airflow 1.9.0 with LocalExecutor and PostgreSQL database in a Linux AMI. I want to manually trigger DAGs, but whenever I create a DAG that has schedule_interval set to None or to @once, the webserver tree view crashes with the following error (I only show the last call):

File "/usr/local/lib/python2.7/site-packages/croniter/croniter.py", line 467, in expand 
    raise CroniterBadCronError(cls.bad_length)
CroniterBadCronError: Exactly 5 or 6 columns has to be specified for iteratorexpression.

Furthermore, when I manually trigger the DAG, a DAG run starts but the tasks themselves are never scheduled. I've looked around, but it seems that I'm the only one with this type of error. Has anyone encountered this error before and found a fix?

Minimal example triggering the problem:

import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'me'
}

bash_command = """
    echo "this is a test task"
"""

with DAG('schedule_test',
        default_args=default_args,
        start_date = dt.datetime(2018, 7, 24),
        schedule_interval='None',
        catchup=False
        ) as dag:

    first_task = BashOperator(task_id = "first_task", bash_command = bash_command)
like image 259
T. van Hees Avatar asked Jul 25 '18 13:07

T. van Hees


1 Answers

Try this:

  • Set your schedule_interval to None without the '', or simply do not specify schedule_interval in your DAG. It is set to None as a default. More information on that here: airflow docs -- search for schedule_interval
  • Set orchestration for your tasks at the bottom of the dag.

Like so:

import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    'owner': 'me'
}

bash_command = """
    echo "this is a test task"
"""

with DAG('schedule_test',
        default_args=default_args,
        start_date = datetime(2018, 7, 24),
        schedule_interval=None,
        catchup=False
        ) as dag:

t1 = DummyOperator(
    task_id='extract_data',
    dag=dag
)

t2 = BashOperator(
    task_id = "first_task", 
    bash_command = bash_command
)

#####ORCHESTRATION#####
## It is saying that in order for t2 to run, t1 must be done.
t2.set_upstream(t1)
like image 82
Zack Avatar answered Sep 23 '22 06:09

Zack