Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow Python operator passing parameters

Tags:

python

airflow

I'm trying to write a Python operator in an airflow DAG and pass certain parameters to the Python callable.

My code looks like below.

def my_sleeping_function(threshold):
   print(threshold)

fmfdependency = PythonOperator(
   task_id='poke_check',
   python_callable=my_sleeping_function,
   provide_context=True,
   op_kwargs={'threshold': 100},
   dag=dag)

end = BatchEndOperator(
   queue=QUEUE,
   dag=dag)

start.set_downstream(fmfdependency)
fmfdependency.set_downstream(end)

But I keep getting the below error.

TypeError: my_sleeping_function() got an unexpected keyword argument 'dag_run'

Not able to figure out why.

like image 734
Riyan Mohammed Avatar asked Feb 15 '19 21:02

Riyan Mohammed


People also ask

What is Op_kwargs in Airflow?

op_kwargs (dict) – A dict of keyword arguments to pass to python_callable. provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates.

What does Python operator mean in Airflow DAG?

The Airflow PythonOperator does exactly what you are looking for. It is a very simple but powerful operator, allowing you to execute a Python callable function from your DAG.

How do you pass context in Airflow?

Passing context to tasks Airflow supports this use case by providing access to the task context. When providing provide_context=True to an operator, we pass along the Airflow context variables to be used inside the operator.


2 Answers

Add **kwargs to your operator parameters list after your threshold param

like image 74
trejas Avatar answered Sep 19 '22 17:09

trejas


This is how you can pass arguments for a Python operator in Airflow.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from time import sleep
from datetime import datetime

def my_func(*op_args):
        print(op_args)
        return op_args[0]

with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag:
        dummy_task      = DummyOperator(task_id='dummy_task', retries=3)
        python_task     = PythonOperator(task_id='python_task', python_callable=my_func, op_args=['one', 'two', 'three'])

        dummy_task >> python_task
like image 25
Shayan Avatar answered Sep 22 '22 17:09

Shayan