Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pass a list of strings as parameter of a dependant task in Airflow

Tags:

python

airflow

I am trying to pass a list of strings from one task to another one via XCom but I do not seem to manage to get the pushed list interpreted back as a list.

For example, when I do this in some function blah that is run in a ShortCircuitOperator:

paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list]
kwargs['ti'].xcom_push(key='return_value', value=full_paths)

and then I want to use such list as a parameter of an operator. For example,

run_task_after_blah = AfterBlahOperator(
    task_id='run-task-after-blah',
    ...,
    input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}",
    ...,
)

I expect input_paths to be equal to paths but it does not because the rendering happens firs and then assignment, and somewhat the template rendering converts the xcom_pull return to a stringified list (and thereafter my AfterBlahOperator inserts assigns that as the value of an element in a JSON.

I tried concatenating the paths into one string separated by some separator and pushing that to the XCom and then splitting that back when pulling from the XCom but as the XCom gets rendered first, I get, either that stringified list when the split function is called inside the template or the original concatenated string of paths if the split function is applied to the parameter (as in "{{ ti.xcom_pull(task_ids='find-paths') }}".split(';').

XCom seems to work great for single values as task parameters or multiple values when the extracted values can be further processed but not for multiple_values to convert into 'one' as parameter of a task.

Is there a way to do this without having to write an extra function that precisely returns such list of strings? Or maybe I am abusing XCom too much, but there are many operators in Airflow that take a list of elements as parameter (e.g., usually the full path to multiple files that are the result of some previous task, hence not known beforehand).

like image 546
Guille Avatar asked Nov 01 '17 10:11

Guille


People also ask

How do you pass parameters to Airflow DAG?

You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field.

What is depends on past in Airflow?

According to the official Airflow docs, The task instances directly upstream from the task need to be in a success state. Also, if you have set depends_on_past=True, the previous task instance needs to have succeeded (except if it is the first run for that task).

How do you pass a Dataframe in Airflow?

In task_1 you can download data from table_1 in some dataframe, process it and save in another table_2 (df. to_sql()). b. Pass the name of the table using xcom.


2 Answers

Jinja renders strings, so if you fetch an XCom via templates, it's always going to be a string. Instead, you will need to fetch the XCom where you have access to the TaskInstance object. Something like this:

class AfterBlahOperator(BaseOperator):

    def __init__(self, ..., input_task_id, *args, **kwargs):
        ...
        self.input_task_id = input_task_id
        super(AfterBlahOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        input_paths = context['ti'].xcom_pull(task_ids=self.input_task_id)
        for path in input_paths:
            ...

This is similar to how you would fetch it within a PythonOperator, which the XCom docs provide an example of.

Note that you can still support a separate input_paths parameter for when it can be hardcoded in a DAG, you'll just need an extra check to see which param to read the value from.

like image 194
Daniel Huang Avatar answered Oct 17 '22 05:10

Daniel Huang


Call eval(input_paths) inside AfterBlahOperator's execute method. This way, stringified list can be converted back to list

class AfterBlahOperator(BaseOperator):
template_fields = (input_paths)

def __init__(self, ..., *args, **kwargs):
    ...


def execute(self, context):
    paths = eval(input_paths) 
    for path in paths:
        ...
like image 1
Vykunta Avatar answered Oct 17 '22 04:10

Vykunta