Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to provide an async function in PythonOperator's python_callable in Airflow?

I have tasks in pipeline to execute but those tasks are often async. I am trying to run the pipeline with Airflow but it is giving me error. "TypeError: can't pickle coroutine objects"

Since the functions are async, I thought to wrap them in "asyncio.run" but still it didn't work.

class Top:
    async def process(self, input_data):
        return [rawstr for rawstr in input_data]

class Bottom:
    async def process(self, input_data):
        return [len(x) for x in input_data]

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 25),
    'retries': 1,
    'provide_context': True,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('sof_dag', default_args=default_args, schedule_interval=timedelta(days=1))

async def top_1(x, **kwargs):
    return asyncio.run(Top().process(x))

async def bottom_1(**kwargs):
    ti = kwargs['ti']
    y = ti.xcom_pull(key=None, task_ids='Router_1')
    return asyncio.run((Bottom().process(y)))

t1 = PythonOperator(
    task_id='task_top_1',
    python_callable=top_1,
    op_args=[["wow! this is great", "this is not how I thought"]],
    dag=dag)

t2 = PythonOperator(
    task_id='task_bottom_1',
    python_callable=bottom_1,
    dag=dag)

t1 >> t2

This is just a dummy scenario to give the idea how async has been used in almost every task of mine. Here is the error trace:

Traceback (most recent call last):
  File "/Users/divyanshushekhar/repos/repo_name/venv/lib/python3.7/site-packages/airflow/models/__init__.py", line 1445, in _run_raw_task
    self.xcom_push(key=XCOM_RETURN_KEY, value=result)
  File "/Users/divyanshushekhar/repos/repo_name/venv/lib/python3.7/site-packages/airflow/models/__init__.py", line 1867, in xcom_push
    execution_date=execution_date or self.execution_date)
  File "/Users/divyanshushekhar/repos/repo_name/venv/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/Users/divyanshushekhar/repos/repo_name/venv/lib/python3.7/site-packages/airflow/models/__init__.py", line 4460, in set
    value = pickle.dumps(value)
TypeError: can't pickle coroutine objects
[2019-08-08 18:37:09,630] {__init__.py:1603} INFO - Marking task as UP_FOR_RETRY
like image 317
Divyanshu Shekhar Avatar asked Aug 09 '19 03:08

Divyanshu Shekhar


1 Answers

You can get around it by calling a function that calls asyncio.run(), dont return a coroutine object

async def request_data(**kwargs):

    # maybe loop & create tasks then offload them other async functions

    async with session.get() as resp:

        data = await resp.text

    return data


def task_callable(**kwargs):

    cred = kwargs['params']['credentials']
    asyncio.run(request_data(session_cred=cred))


default_args = {
    owner:'yourname'
}

with DAG('my_dag', default_args=default_args, schedule = '@daily') as dag:

    task = PythonOperator(
        task_id = 'async_task',
        python_callable = task_callable,
        provide_context = True,
        params = {
            'credentials':get_cred(key_file)
    }
)
like image 140
CrookedCloud Avatar answered Oct 26 '22 22:10

CrookedCloud