I have the following DAG, which executes the different methods with a class dedicated to a data preprocessing routine:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('MARKETING_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
from table_builder import OnlineOfflinePreprocess
else:
print('Define MARKETING_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime.now(),
'max_active_runs': 1,
'concurrency': 4
}
worker = OnlineOfflinePreprocess()
DAG = DAG(
dag_id='marketing_data_preproc',
default_args=default_args,
start_date=datetime.today()
)
import_online_data = PythonOperator(
task_id='import_online_data',
python_callable=worker.import_online_data,
dag=DAG)
import_offline_data = PythonOperator(
task_id='import_offline_data',
python_callable=worker.import_offline_data,
dag=DAG)
merge_aurum_to_sherlock = PythonOperator(
task_id='merge_aurum_to_sherlock',
python_callable=worker.merge_aurum_to_sherlock,
dag=DAG)
merge_sherlock_to_aurum = PythonOperator(
task_id='merge_sherlock_to_aurum',
python_callable=worker.merge_sherlock_to_aurum,
dag=DAG)
upload_au_to_sh = PythonOperator(
task_id='upload_au_to_sh',
python_callable=worker.upload_table,
op_args='aurum_to_sherlock',
dag=DAG)
upload_sh_to_au = PythonOperator(
task_id='upload_sh_to_au',
python_callable=worker.upload_table,
op_args='sherlock_to_aurum',
dag=DAG)
import_online_data >> merge_aurum_to_sherlock
import_offline_data >> merge_aurum_to_sherlock
merge_aurum_to_sherlock >> merge_sherlock_to_aurum
merge_aurum_to_sherlock >> upload_au_to_sh
merge_sherlock_to_aurum >> upload_sh_to_au
This produces the following error:
[2017-09-07 19:32:09,587] {base_task_runner.py:97} INFO - Subtask: AttributeError: 'OnlineOfflinePreprocess' object has no attribute 'online_info'
Which is actually pretty obvious given how airflow works: the outputs from the different class methods called aren't stored to the global class object initialized at the top of the graph.
Can I solve this with XCom
? Overall, what is the thinking about how to blend the coherence of OOP with Airflow?
It's less of an issue about OOP with airflow and more about state with airflow.
Any state that needs to be passed between tasks needs to be stored persistently. This is because each airflow task is an independent process (which could even be running on a different machine!) and thus in-memory communication is not possible.
You are correct you can use XCOM to pass this state (if it's small, since it gets stored in the airflow database). If it's large you probably want to store it somewhere else, maybe a filesystem or S3 or HDFS or a specialized database.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With