Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Confused about Airflow's BaseSensorOperator parameters : timeout, poke_interval and mode

I have a bit of confusion about the way BaseSensorOperator's parameters work: timeout & poke_interval. Consider this usage of the sensor :

BaseSensorOperator(
  soft_fail=True,
  poke_interval = 4*60*60,  # Poke every 4 hours
  timeout = 12*60*60,  # Timeout after 12 hours
)

The documentation mentions the timeout acts to set the task to 'fail' after it runs out. But I'm using a soft_fail=True, I don't think it retains the same behavior, because I've found the task failed instead of skipping after I've used both parameters soft_fail and timeout.

So what does happen here?

  1. The sensor pokes every 4 hours, and at every poke, will wait for the duration of the timeout (12 hours)?
  2. Or does it poke every 4 hours, for a total of 3 pokes, then times out?
  3. Also, what happens with these parameters if I use the mode="reschedule"?

Here's the documentation of the BaseSensorOperator

class BaseSensorOperator(BaseOperator, SkipMixin):
    """
    Sensor operators are derived from this class and inherit these attributes.
    Sensor operators keep executing at a time interval and succeed when
    a criteria is met and fail if and when they time out.
    :param soft_fail: Set to true to mark the task as SKIPPED on failure
    :type soft_fail: bool
    :param poke_interval: Time in seconds that the job should wait in
        between each tries
    :type poke_interval: int
    :param timeout: Time, in seconds before the task times out and fails.
    :type timeout: int
    :param mode: How the sensor operates.
        Options are: ``{ poke | reschedule }``, default is ``poke``.
        When set to ``poke`` the sensor is taking up a worker slot for its
        whole execution time and sleeps between pokes. Use this mode if the
        expected runtime of the sensor is short or if a short poke interval
        is requried.
        When set to ``reschedule`` the sensor task frees the worker slot when
        the criteria is not yet met and it's rescheduled at a later time. Use
        this mode if the expected time until the criteria is met is. The poke
        inteval should be more than one minute to prevent too much load on
        the scheduler.
    :type mode: str
    """
like image 570
Imad Avatar asked Sep 07 '20 09:09

Imad


People also ask

What is the difference between Execution_timeout from the BaseOperator and timeout from the BaseSensorOperator?

BaseSensorOperator inherits from BaseOperator, the documentation states execution_timeout (datetime. timedelta) – max time allowed for the execution of this task instance, if it goes beyond it will raise and fail.

What does the following Poke_interval setting signify in an airflow sensor task?

poke_interval : When using poke mode, this is the time in seconds that the sensor waits before checking the condition again. The default is 30 seconds.

What is the use of sensor Operators in Airflow?

Sensors are a special type of Operator that are designed to do exactly one thing - wait for something to occur. It can be time-based, or waiting for a file, or an external event, but all they do is wait until something happens, and then succeed so their downstream tasks can run.

What is smart sensor in airflow?

The smart sensor is a service (run by a builtin DAG) which greatly reduces airflow's infrastructure cost by consolidating some of the airflow long running light weight tasks.


2 Answers

Defining the terms

  1. poke_interval: the duration b/w successive 'pokes' (evaluation the necessary condition that is being 'sensed')

  2. timeout: Just poking indefinitely is inadmissible (if for e.g. your buggy code is poking on day to become 29 whenever month is 2, it will keep poking for upto 4 years). So we define a maximum period beyond which we stop poking and terminate (the sensor is marked either FAILED or SKIPPED)

  3. soft_fail: Normally (when soft_fail=False), sensor is marked as FAILED after timeout. When soft_fail=True, sensor will instead be marked as SKIPPED after timeout

  4. mode: This is a slightly complex

    • Any task (including sensor) when runs, eats up a slot in some pool (either default pool or explicitly specified pool); essentially meaning that it takes up some resources.
    • For sensors, this is
      • wasteful: as a slot is consumed even when we are just waiting (doing no actual work
      • dangerous: if your workflow has too many sensors that go into sensing around the same time, they can freeze a lot of resources for quite a bit. In fact too many having ExternalTaskSensors is notorious for putting entire workflows (DAGs) into deadlocks
    • To overcome this problem, Airflow v1.10.2 introduced modes in sensors
      • mode='poke' (default) means the existing behaviour that we discussed above
      • mode='reschedule' means after a poke attempt, rather than going to sleep, the sensor will behave as though it failed (in current attempt) and it's status will change from RUNNING to UP_FOR_RETRY. That ways, it will release it's slot, allowing other tasks to progress while it waits for another poke attempt
    • Citing the relevant snippet from code here
    if self.reschedule:
        reschedule_date = timezone.utcnow() + timedelta(
            seconds=self._get_next_poke_interval(started_at, try_number))
        raise AirflowRescheduleException(reschedule_date)
    else:
        sleep(self._get_next_poke_interval(started_at, try_number))
        try_number += 1
    
    • For more info read Sensors Params section

And now answering your questions directly

Q1

  1. The sensor pokes every 4 hours, and at every poke, will wait for the duration of the timeout (12 hours)?
  2. Or does it poke every 4 hours, for a total of 3 pokes, then times out?

point 2. is correct

Q2

Also, what happens with these parameters if I use the mode="reschedule"?

As explained earlier, each one of those params are independent and setting mode='reschedule' doesn't alter their behaviour in any way

like image 181
y2k-shubham Avatar answered Oct 21 '22 15:10

y2k-shubham


BaseSensorOperator(
  soft_fail=True,
  poke_interval = 4*60*60,  # Poke every 4 hours
  timeout = 12*60*60,  # Timeout of 12 hours
  mode = "reschedule"
)

Let's say the criteria is not met at the first poke. So it will run again after 4 hours of interval. But the worker slot will be freed during the wait since we're using the mode="reschedule".

That is what I understood.

like image 30
Sanajaoba Thongram Avatar answered Oct 21 '22 15:10

Sanajaoba Thongram