In my DAG I have a TimeDeltaSensor
created using:
from datetime import datetime, timedelta
from airflow.operators.sensors import TimeDeltaSensor
wait = TimeDeltaSensor(
task_id='wait',
delta=timedelta(seconds=300),
dag=dag
)
However when it runs I get error
- Subtask: [2018-07-13 09:00:39,663] {models.py:1427} ERROR - unsupported operand type(s) for +=: 'NoneType' and 'datetime.timedelta'
Airflow version is 1.8.1.
The code is basically lifted from Example Pipeline definition so I'm nonplussed as to what the problem could be. Any ideas?
Looking into the source code you linked there is one line that strikes me as interesting in this case:
target_dttm = dag.following_schedule(context['execution_date'])
Which means: If you don't have setup a proper DAG schedule this component will try to add its time delta to None
.
I am not sure if the code in the question is just an example or the whole thing. My suggestion is: Add a DAG schedule with is other than None
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With