Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use XCom to exchange data between classes?

I have the following DAG, which executes the different methods with a class dedicated to a data preprocessing routine:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('MARKETING_PREPROC_PATH')

if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    from table_builder import OnlineOfflinePreprocess
else:
    print('Define MARKETING_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime.now(),
  'max_active_runs': 1,
  'concurrency': 4
}

worker = OnlineOfflinePreprocess()

DAG = DAG(
  dag_id='marketing_data_preproc',
  default_args=default_args,
  start_date=datetime.today()
)

import_online_data = PythonOperator(
  task_id='import_online_data',
  python_callable=worker.import_online_data,
  dag=DAG)

import_offline_data = PythonOperator(
  task_id='import_offline_data',
  python_callable=worker.import_offline_data,
  dag=DAG)

merge_aurum_to_sherlock = PythonOperator(
  task_id='merge_aurum_to_sherlock',
  python_callable=worker.merge_aurum_to_sherlock,
  dag=DAG)

merge_sherlock_to_aurum = PythonOperator(
   task_id='merge_sherlock_to_aurum',
   python_callable=worker.merge_sherlock_to_aurum,
   dag=DAG)

upload_au_to_sh = PythonOperator(
  task_id='upload_au_to_sh',
  python_callable=worker.upload_table,
  op_args='aurum_to_sherlock',
  dag=DAG)

upload_sh_to_au = PythonOperator(
  task_id='upload_sh_to_au',
  python_callable=worker.upload_table,
  op_args='sherlock_to_aurum',
  dag=DAG)

import_online_data >> merge_aurum_to_sherlock
import_offline_data >> merge_aurum_to_sherlock

merge_aurum_to_sherlock >> merge_sherlock_to_aurum
merge_aurum_to_sherlock >> upload_au_to_sh
merge_sherlock_to_aurum >> upload_sh_to_au

This produces the following error:

[2017-09-07 19:32:09,587] {base_task_runner.py:97} INFO - Subtask: AttributeError: 'OnlineOfflinePreprocess' object has no attribute 'online_info'

Which is actually pretty obvious given how airflow works: the outputs from the different class methods called aren't stored to the global class object initialized at the top of the graph.

Can I solve this with XCom? Overall, what is the thinking about how to blend the coherence of OOP with Airflow?

like image 969
aaron Avatar asked Jan 30 '23 15:01

aaron


1 Answers

It's less of an issue about OOP with airflow and more about state with airflow.

Any state that needs to be passed between tasks needs to be stored persistently. This is because each airflow task is an independent process (which could even be running on a different machine!) and thus in-memory communication is not possible.

You are correct you can use XCOM to pass this state (if it's small, since it gets stored in the airflow database). If it's large you probably want to store it somewhere else, maybe a filesystem or S3 or HDFS or a specialized database.

like image 58
gnicholas Avatar answered Feb 02 '23 10:02

gnicholas