Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to dynamically iterate over the output of an upstream task to create parallel tasks in airflow?

Consider the following example of a DAG where the first task, get_id_creds, extracts a list of credentials from a database. This operation tells me what users in my database I am able to run further data preprocessing on and it writes those ids to the file /tmp/ids.txt. I then scan those ids into my DAG and use them to generate a list of upload_transaction tasks that can be run in parallel.

My question is: Is there a more idiomatically correct, dynamic way to do this using airflow? What I have here feels clumsy and brittle. How can I directly pass a list of valid IDs from one process to that defines the subsequent downstream processes?

from datetime import datetime, timedelta
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime.now(),
  'schedule_interval': None
}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

get_id_creds = PythonOperator(
    task_id='get_id_creds',
    python_callable=dash_workers.get_id_creds, 
    provide_context=True,
    dag=DAG)

with open('/tmp/ids.txt', 'r') as infile:
    ids = infile.read().splitlines()

for uid in uids:
    upload_transactions = PythonOperator(
        task_id=uid,
        python_callable=dash_workers.upload_transactions,
        op_args=[uid],
        dag=DAG)
    upload_transactions.set_downstream(get_id_creds)
like image 294
aaron Avatar asked Jul 25 '17 21:07

aaron


2 Answers

Per @Juan Riza's suggestion I checked out this link: Proper way to create dynamic workflows in Airflow. This was pretty much the answer, although I was able to simplify the solution enough that I thought I would offer my own modified version of the implementation here:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

ENV = os.environ

default_args = {
  # 'start_date': datetime.now(),
  'start_date': datetime(2017, 7, 18)
}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

clear_tables = PythonOperator(
  task_id='clear_tables',
  python_callable=dash_workers.clear_db,
  dag=DAG)

def id_worker(uid):
    return PythonOperator(
        task_id=uid,
        python_callable=dash_workers.main_preprocess,
        op_args=[uid],
        dag=DAG)

for uid in capone_dash_workers.get_id_creds():
    clear_tables >> id_worker(uid)

clear_tables cleans the database that will be re-built as a result of the process. id_worker is a function that dynamically generates new preprocessing tasks, based on the array of ID values returned from get_if_creds. The task ID is just the corresponding user ID, though it could easily have been an index, i, as in the example mentioned above.

NOTE That bitshift operator (<<) looks backwards to me, as the clear_tables task should come first, but it's what seems to be working in this case.

like image 131
aaron Avatar answered Sep 30 '22 19:09

aaron


Considering that Apache Airflow is a workflow management tool, ie. it determines the dependencies between task that the user defines in comparison (as an example) with apache Nifi which is a dataflow management tool, ie. the dependencies here are data which are transferd through the tasks.

That said, i think that your approach is quit right (my comment is based on the code posted) but Airflow offers a concept called XCom. It allows tasks to "cross-communicate" between them by passing some data. How big should the passed data be ? it is up to you to test! But generally it should be not so big. I think it is in the form of key,value pairs and it get stored in the airflow meta-database,ie you can't pass files for example but a list with ids could work.

Like i said you should test it your self. I would be very happy to know your experience. Here is an example dag which demonstrates the use of XCom and here is the necessary documentation. Cheers!

like image 22
sdikby Avatar answered Sep 30 '22 18:09

sdikby