I have already seen this and this questions on SO and made the changes accordingly. However, my dependent DAG still gets stuck in poking state. Below is my master DAG:
from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
today = datetime.today()
default_args = {
'depends_on_past': False,
'retries': 0,
'start_date': datetime(today.year, today.month, today.day),
'schedule_interval': '@once'
}
dag = DAG('call-procedure-and-bash', default_args=default_args)
call_procedure = JdbcOperator(
task_id='call_procedure',
jdbc_conn_id='airflow_db2',
sql='CALL AIRFLOW.TEST_INSERT (20)',
dag=dag
)
call_procedure
Below is my dependent DAG:
from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor
today = datetime.today()
default_args = {
'depends_on_past': False,
'retries': 0,
'start_date': datetime(today.year, today.month, today.day),
'schedule_interval': '@once'
}
dag = DAG('external-dag-upstream', default_args=default_args)
task_sensor = ExternalTaskSensor(
task_id='link_upstream',
external_dag_id='call-procedure-and-bash',
external_task_id='call_procedure',
execution_delta=timedelta(minutes=-2),
dag=dag
)
count_rows = JdbcOperator(
task_id='count_rows',
jdbc_conn_id='airflow_db2',
sql='SELECT COUNT(*) FROM AIRFLOW.TEST',
dag=dag
)
count_rows.set_upstream(task_sensor)
Below are the logs of dependent DAG once the master DAG gets executed:
[2019-01-10 11:43:52,951] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
[2019-01-10 11:44:52,955] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
[2019-01-10 11:45:52,961] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
[2019-01-10 11:46:52,949] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
[2019-01-10 11:47:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
[2019-01-10 11:48:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
[2019-01-10 11:49:52,905] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
Below are the logs of master DAG execution:
[2019-01-10 11:45:20,215] {{jdbc_operator.py:56}} INFO - Executing: CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:21,477] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:21,476] {{dbapi_hook.py:166}} INFO - CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:24,139] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:24,137] {{jobs.py:2627}} INFO - Task exited with return code 0
My assumption is, Airflow should trigger the dependent DAG if the master runs fine? I have tried playing around with execution_delta
but that doesn't seem to work.
Also, schedule_interval
and start_date
are same for both of the DAGs so don't think that should cause any trouble.
Am I missing anything?
Apache Airflow Certified. Waits for a different DAG or a task in a different DAG to complete for a specific logical date.
poke_interval : When using poke mode, this is the time in seconds that the sensor waits before checking the condition again. The default is 30 seconds.
Airflow loads DAGs from Python source files, which it looks for inside its configured DAG_FOLDER . It will take each file, execute it, and then load any DAG objects from that file. This means you can define multiple DAGs per Python file, or even spread one very complex DAG across multiple Python files using imports.
Make sure both DAGs start at the same time and you don't start either DAGs manually.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With