Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

dag.py raises: "airflow.exceptions.AirflowException: Task is missing the start_date parameter", but its given in code

i today tried to create my first airflow DAG:

from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'default_user',
    'start_date': days_ago(2),
    'depends_on_past': True,
    # With this set to true, the pipeline won't run if the previous day failed
    'email': ['[email protected]'],
    'email_on_failure': True,
    # upon failure this pipeline will send an email to your email set above
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=30),
}

dag = DAG(
    'basic_dag_2',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)


def my_func():
    print('Hello from my_func')


bashtask = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

dummy_task = DummyOperator(task_id='dummy_task', retries=3)

python_task = PythonOperator(task_id='python_task', python_callable=my_func)

dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)

My Airflow is running properly on Python3.6.8, but when I try to import the dagbag into airflow it throws this exception and I rly don't know why:

[2020-05-11 17:11:15,601] {scheduler_job.py:1576} WARNING - No viable dags retrieved from /root/airflow/dags/first_dag.py
[2020-05-11 17:11:15,616] {scheduler_job.py:162} INFO - Processing /root/airflow/dags/first_dag.py took 0.031 seconds
[2020-05-11 17:12:05,647] {scheduler_job.py:154} INFO - Started process (PID=26569) to work on /root/airflow/dags/first_dag.py
[2020-05-11 17:12:05,653] {scheduler_job.py:1562} INFO - Processing file /root/airflow/dags/first_dag.py for tasks to queue
[2020-05-11 17:12:05,654] {logging_mixin.py:112} INFO - [2020-05-11 17:12:05,654] {dagbag.py:396} INFO - Filling up the DagBag from /root/airflow/dags/first_dag.py
[2020-05-11 17:12:05,666] {logging_mixin.py:112} INFO - [2020-05-11 17:12:05,662] {dagbag.py:239} ERROR - Failed to import: /root/airflow/dags/first_dag.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 236, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/usr/lib64/python3.6/imp.py", line 172, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 684, in _load
  File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/root/airflow/dags/first_dag.py", line 34, in <module>
    dag=dag,
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 70, in __init__
    super(BashOperator, self).__init__(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 422, in __init__
    self.dag = dag
  File "/usr/local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 548, in dag
    dag.add_task(self)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 1301, in add_task
    raise AirflowException("Task is missing the start_date parameter")
airflow.exceptions.AirflowException: Task is missing the start_date parameter

I thought that I should give my Operators also an start_date, but they should also use the date from their DAG.

like image 466
Luigi Drago Avatar asked May 12 '20 10:05

Luigi Drago


People also ask

What is Start_date in Airflow DAG?

An Airflow DAG defined with a start_date , possibly an end_date , and a non-dataset schedule, defines a series of intervals which the scheduler turns into individual DAG runs and executes.

How do I reset my DAG Airflow?

Go to Browse > Task Instances, filter for and select the failed task(s), and Delete. This is essentially the same as clearing individual tasks in the DAG graph or tree view. Go to Browse > DAG Runs, filter for and select the failed DAG(s), and Set state to 'running'.

How do I add Airflow to DAG?

To create a DAG in Airflow, you always have to import the DAG class. After the DAG class, come the imports of Operators. Basically, for each Operator you want to use, you have to make the corresponding import. For example, you want to execute a Python function, you have to import the PythonOperator.


2 Answers

That is because two of your tasks have not been assigned to the DAG which contains the start_date in default_args.

dummy_task = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

python_task = PythonOperator(task_id='python_task', python_callable=my_func, dag=dag)

Note you can use DAG object as a context manager as mentioned in https://airflow.apache.org/docs/stable/concepts.html#context-manager to avoid repeating dag=dag for all tasks:

Example:

with DAG(
    'basic_dag_2',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
) as dag:

    bashtask = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    dummy_task = DummyOperator(task_id='dummy_task', retries=3)

    python_task = PythonOperator(task_id='python_task', python_callable=my_func)

    dummy_task.set_downstream(bashtask)
    python_task.set_downstream(bashtask)
like image 172
kaxil Avatar answered Oct 14 '22 23:10

kaxil


Had the same issue, You simply need to put dag=dag inside each operator that you use. because your operator still needs few more parameters to run as a task and those parameters are defined in DAG section before a TASK can run.

an example: -this is wrong:

postgres_task_1 = PostgresOperator(
        task_id="get_param_2",
        postgres_conn_id="aramis_postgres_connection",
        sql="""
            SELECT  param_num_2 FROM public.aramis_meta_task
            """,
    )

-this is right:

postgres_task_1 = PostgresOperator(
        dag=dag,
        task_id="get_param_2",
        postgres_conn_id="aramis_postgres_connection",
        sql="""
            SELECT  param_num_2 FROM public.aramis_meta_task
            """,
    )
like image 25
Aramis NSR Avatar answered Oct 14 '22 23:10

Aramis NSR