Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best way to loop through parameters in Airflow?

I'm trying to get familiar with Airflow and loving it so far.

One thing i'm a little unclear on however is how to properly paramterise my dag where i want to run the same dag but in parallel for multiple line of business (lob). So basically i want to run the dag below for multiple lob's in each run and have each lob run in parralel.

So lets say i define a variable that is an array of lobs like "lob1", "lob2" etc. I'd like to replace 'mylob' in the bigquery sql statement below with 'lob1' then 'lob2' etc.

I'm thinking maybe i can store the lobs as a variable from the ui and then loop through that in the dag but i'm not sure if that would end up being sequential as it waits for each task to finish in each loop iteration.

Another approach i think might be to use this paramaterised dag as a sub dag in a sort of larger driver dag. But again not sure if this is a best practice approach here.

Any help or pointers much appreciated. I feel like i'm missing something obvious here but am not quite finding an example like this anywhere.

"""
### My first dag to play around with bigquery and gcp stuff.
"""

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from dateutil import tz
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 3, 10),    
    'email': ['[email protected]'],
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

with DAG('my_bq_dag_2', schedule_interval='30 */1 * * *',
         default_args=default_args) as dag:
      
    bq_msg_1 = BigQueryOperator(
        task_id='my_bq_task_1',
        bql='select "mylob" as lob, "Hello World!" as msg',
        destination_dataset_table='airflow.test1',
        write_disposition='WRITE_TRUNCATE',
        bigquery_conn_id='gcp_smoke'
    )
    
    bq_msg_1.doc_md = """\
    #### Task Documentation
    Append a "Hello World!" message string to the table [airflow.msg]
    """
    
    bq_msg_2 = BigQueryOperator(
        task_id='my_bq_task_2',
        bql='select "mylob" as lob, "Goodbye World!" as msg',
        destination_dataset_table='airflow.test1',
        write_disposition='WRITE_APPEND',
        bigquery_conn_id='gcp_smoke'
    )
    
    bq_msg_2.doc_md = """\
    #### Task Documentation
    Append a "Goodbye World!" message string to the table [airflow.msg]
    """
    
    # set dependencies
    bq_msg_2.set_upstream(bq_msg_1)

Update: Trying to get this working but it never seems to make it to lob2


"""
### My first dag to play around with bigquery and gcp stuff.
"""

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 3, 10),    
    'email': ['[email protected]'],
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('my_bq_dag_2', schedule_interval='@once',default_args=default_args)

lobs = ["lob1","lob2","lob3"]

for lob in lobs:

    templated_command = """
        select '{{ params.lob }}' as lob, concat(string(current_timestamp()),' - Hello - {{ ds }}') as msg
    """    
         
    bq_msg_1 = BigQueryOperator(
        dag = dag,
        task_id='my_bq_task_1',
        bql=templated_command,
        params={'lob': lob},
        destination_dataset_table='airflow.test1',
        write_disposition='WRITE_APPEND',
        bigquery_conn_id='gcp_smoke'
    )
like image 817
andrewm4894 Avatar asked Mar 21 '17 14:03

andrewm4894


Video Answer


1 Answers

think i have found an answer/approach that seems to work for me (my problem above was not having unique task id's).

Did a little blog post on the example in case any use to others.

http://engineering.pmc.com/2017/03/playing-around-with-apache-airflow-bigquery-62/

like image 141
andrewm4894 Avatar answered Oct 15 '22 12:10

andrewm4894