Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Persisting state on airflow sensor rescheduled runs

Tags:

airflow

I am working with BaseSensorOperator in Airflow. I have a use-case in which I'd like the BaseSensorOperator.poke(context) function to pass some information to the next call of poke. I have attempted using Xcom as follows (mock case with meaningless values passed):

    def poke(self, context):
        task_instance = context['task_instance']
        old_value = task_instance.xcom_pull(key='passing_this_value')
        if old_value:
            logging.info(f'retrieved from Xcom {old_value}')
        else:
            logging.info('no value was retrieved')
        new_value = datetime.now()
        logging.info(f'sending this value to Xcom {new_value}')
        task_instance.xcom_push(key='passing_this_value', value=new_value)

        if new_value.minute % 10 == 0:
            return True
        else:
            return False

This works great when the sensor task is created with method='poke' but fails when method='reschedule' because on rescheduling Xcom is cleared for the task in that run.

Is there a way around it? I could use Variable but that would greatly litter the variable space. Any other suggestion?

like image 339
Giovanni Avatar asked Nov 07 '22 14:11

Giovanni


1 Answers

I encountered the same problem as you. With the "reschedule" mode, it cleans every property of the task instance, including the xcom.

In fact, every xcom message is identified with the DAG_ID, TASK_ID, EXECUTION_DATE and the KEY.

In order to not let the reschedule mechanism to clean up the xcom for the task instance, I find a way to do it by using the XCOM interface directly instead of the one of TI. But, we need to modify the TASK_ID to another one in order to espace the clean up process.

Here is my code :

    def poke(self, context):
    request_id = context['ti'].xcom_pull(key="requestId", task_ids=self.task_id + "_original", dag_id=self.dag_id)
    if request_id is None:
        self.log.info("requestId not found, so we launch the Lambda function")
        request_id = self.launch_lambda()

        # To avoid task reschedule clean the xcom message, we use the XCom API directly to push the requestId
        # but with a taskId modified
        XCom.set(
            key="requestId",
            value=request_id,
            task_id=self.task_id + "_original",
            dag_id=self.dag_id,
            execution_date=context['ti'].execution_date)
        self.log.info("Pushed requestId %s to xcom", request_id)
like image 148
Xun Ren Avatar answered Nov 16 '22 15:11

Xun Ren