I am trying to pass a list of strings from one task to another one via XCom but I do not seem to manage to get the pushed list interpreted back as a list.
For example, when I do this in some function blah
that is run in a ShortCircuitOperator
:
paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list]
kwargs['ti'].xcom_push(key='return_value', value=full_paths)
and then I want to use such list as a parameter of an operator. For example,
run_task_after_blah = AfterBlahOperator(
task_id='run-task-after-blah',
...,
input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}",
...,
)
I expect input_paths
to be equal to paths
but it does not because the rendering happens firs and then assignment, and somewhat the template rendering converts the xcom_pull
return to a stringified list (and thereafter my AfterBlahOperator
inserts assigns that as the value of an element in a JSON.
I tried concatenating the paths
into one string separated by some separator and pushing that to the XCom and then splitting that back when pulling from the XCom but as the XCom gets rendered first, I get, either that stringified list when the split
function is called inside the template or the original concatenated string of paths
if the split
function is applied to the parameter (as in "{{ ti.xcom_pull(task_ids='find-paths') }}".split(';')
.
XCom seems to work great for single values as task parameters or multiple values when the extracted values can be further processed but not for multiple_values to convert into 'one' as parameter of a task.
Is there a way to do this without having to write an extra function that precisely returns such list of strings? Or maybe I am abusing XCom too much, but there are many operators in Airflow that take a list of elements as parameter (e.g., usually the full path to multiple files that are the result of some previous task, hence not known beforehand).
You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field.
According to the official Airflow docs, The task instances directly upstream from the task need to be in a success state. Also, if you have set depends_on_past=True, the previous task instance needs to have succeeded (except if it is the first run for that task).
In task_1 you can download data from table_1 in some dataframe, process it and save in another table_2 (df. to_sql()). b. Pass the name of the table using xcom.
Jinja renders strings, so if you fetch an XCom via templates, it's always going to be a string. Instead, you will need to fetch the XCom where you have access to the TaskInstance
object. Something like this:
class AfterBlahOperator(BaseOperator):
def __init__(self, ..., input_task_id, *args, **kwargs):
...
self.input_task_id = input_task_id
super(AfterBlahOperator, self).__init__(*args, **kwargs)
def execute(self, context):
input_paths = context['ti'].xcom_pull(task_ids=self.input_task_id)
for path in input_paths:
...
This is similar to how you would fetch it within a PythonOperator
, which the XCom docs provide an example of.
Note that you can still support a separate input_paths
parameter for when it can be hardcoded in a DAG, you'll just need an extra check to see which param to read the value from.
Call eval(input_paths)
inside AfterBlahOperator
's execute
method. This way, stringified list can be converted back to list
class AfterBlahOperator(BaseOperator):
template_fields = (input_paths)
def __init__(self, ..., *args, **kwargs):
...
def execute(self, context):
paths = eval(input_paths)
for path in paths:
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With