Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to log output from Airflow DAG for debugging?

I am writing a Airflow DAG and having some problems with a function. I am trying to debug by printing data to stdout and using the logging library.

My example DAG is:

    from datetime import timedelta
    
    import airflow
    import logging
    
    from airflow.models import DAG
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.contrib.hooks.datadog_hook import DatadogHook
    
    def datadog_event(title, text, dag_id, task_id):
        hook = DatadogHook()
        tags = [
            f'dag:{dag_id}',
            f'task:{task_id}',
        ]
    
        hook.post_event(title, text, tags)
    
    def datadog_event_success(context):
        dag_id = context['task_instance'].dag_id
        task_id = context['task_instance'].task_id
        text = f'Airflow DAG failure for {dag_id}\n\nDAG: {dag_id}\nTasks: {task_id}'
        title = f'Airflow DAG success for {dag_id}'
    
        logging.info(title)
        logging.info(text)
        logging.info(dag_id)
        logging.info(task_id)
    
        datadog_event(title, text, dag_id, task_id)
    
    args = {
        'owner': 'airflow',
        'start_date': airflow.utils.dates.days_ago(2),
    }
    
    dag = DAG(
        dag_id='example_callback',
        default_args=args,
        schedule_interval='*/5 * * * *',
        dagrun_timeout=timedelta(minutes=60),
        on_success_callback=datadog_event_success,
    )
    
    my_task = DummyOperator(
        task_id='run_this_last',
        dag=dag,
    )

During a run I get an error:

airflow[9490]: Process DagFileProcessor4195-Process:
airflow[9490]: Traceback (most recent call last):
airflow[9490]:   File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
airflow[9490]:     self.run()
airflow[9490]:   File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
airflow[9490]:     self._target(*self._args, **self._kwargs)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 148, in _run_file_processor
airflow[9490]:     result = scheduler_job.process_file(file_path, pickle_dags)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
airflow[9490]:     return func(*args, **kwargs)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1542, in process_file
airflow[9490]:     self._process_dags(dagbag, dags, ti_keys_to_schedule)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1239, in _process_dags
airflow[9490]:     self._process_task_instances(dag, tis_out)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
airflow[9490]:     return func(*args, **kwargs)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 732, in _process_task_instances
airflow[9490]:     run.update_state(session=session)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
airflow[9490]:     return func(*args, **kwargs)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/models/dagrun.py", line 318, in update_state
airflow[9490]:     dag.handle_callback(self, success=True, reason='success', session=session)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
airflow[9490]:     return func(*args, **kwargs)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/models/dag.py", line 620, in handle_callback
airflow[9490]:     callback(context)
airflow[9490]:   File "/home/airflow/analytics/etl_v2/airflow_data/dags/example_bash_operator_andy.py", line 68, in datadog_event_success
airflow[9490]:     datadog_event(title, text, dag_id, task_id)
airflow[9490]:   File "/home/airflow/analytics/etl_v2/airflow_data/dags/example_bash_operator_andy.py", line 45, in datadog_event
airflow[9490]:     hook.post_event(title, text, tags)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/contrib/hooks/datadog_hook.py", line 157, in post_event
airflow[9490]:     self.validate_response(response)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/contrib/hooks/datadog_hook.py", line 58, in validate_response
airflow[9490]:     if response['status'] != 'ok':
airflow[9490]: KeyError: 'status'

But none of my logged into is before or after the error in the scheduler, webserver, worker, or task logs.

I have tested the datadog_event call on my Airflow worker by manually importing the code and it logs properly when I run it that way:

airflow@airflow-worker-0:~/analytics$ /home/airflow/virtualenv/bin/python -i /home/airflow/analytics/etl_v2/airflow_data/dags/example_bash_operator_andy.py
[2019-08-07 20:48:01,890] {settings.py:213} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=29941
[2019-08-07 20:48:02,227] {__init__.py:51} INFO - Using executor DaskExecutor

>>> datadog_event('My title', 'My task', 'example_bash_operator_andy', 'run_this_last')
[2019-08-07 20:51:17,542] {datadog_hook.py:54} INFO - Setting up api keys for Datadog
[2019-08-07 20:51:17,544] {example_bash_operator_andy.py:38} INFO - My title
[2019-08-07 20:51:17,544] {example_bash_operator_andy.py:39} INFO - My task
[2019-08-07 20:51:17,544] {example_bash_operator_andy.py:40} INFO - example_bash_operator_andy
[2019-08-07 20:51:17,545] {example_bash_operator_andy.py:41} INFO - run_this_last
[2019-08-07 20:51:17,658] {api_client.py:139} INFO - 202 POST https://api.datadoghq.com/api/v1/events (113.2174ms)

My airflow.cfg is posted at https://gist.github.com/andyshinn/d743ddc61956ed7440c500fca962ce92 and I am using Airflow 1.10.4.

How can I output logging or messages from the DAG itself to better debug what might be happening?

like image 392
Andy Shinn Avatar asked Aug 07 '19 20:08

Andy Shinn


People also ask

How do I enable the debug log in Airflow?

To see DEBUG logs when debugging your Python tasks, you need to set AIRFLOW__LOGGING__LOGGING_LEVEL=DEBUG or change the value of logging_level in airflow. cfg . After debugging, you can change the logging_level back to INFO without modifying your DAG code.

How do you check Airflow DAG logs?

You can also view the logs in the Airflow web interface. Streaming logs: These logs are a superset of the logs in Airflow. To access streaming logs, you can go to the logs tab of Environment details page in Google Cloud console, use the Cloud Logging, or use Cloud Monitoring. Logging and Monitoring quotas apply.


1 Answers

DAG-level callbacks (on_success, on_failure) occur in the main scheduler loop. See this open issue to move execution of these functions out of the scheduler thread. Exceptions raised in your callback function will appear in the scheduler logs. However, annoyingly, print and logging do NOT appear to make it into the scheduler logs.

For debugging purposes, I typically just raise as an exception the info I'm trying to log so it will appear in the scheduler logs.

Alternatively, you could move the callback to the TASK level. This means moving it into your default_args, like so:

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'on_success_callback': datadog_event_success
}

dag = DAG(
    dag_id='example_callback',
    default_args=args,
    schedule_interval='*/5 * * * *',
    dagrun_timeout=timedelta(minutes=60)
)

Your callback's logging will now appear in the task logs (not the scheduler logs). However, this means the callback will be called for every qualifying task, not just once for the DAG.

like image 161
fizloki Avatar answered Sep 30 '22 12:09

fizloki