I'm trying to write a Python operator in an airflow DAG and pass certain parameters to the Python callable.
My code looks like below.
def my_sleeping_function(threshold):
print(threshold)
fmfdependency = PythonOperator(
task_id='poke_check',
python_callable=my_sleeping_function,
provide_context=True,
op_kwargs={'threshold': 100},
dag=dag)
end = BatchEndOperator(
queue=QUEUE,
dag=dag)
start.set_downstream(fmfdependency)
fmfdependency.set_downstream(end)
But I keep getting the below error.
TypeError: my_sleeping_function() got an unexpected keyword argument 'dag_run'
Not able to figure out why.
op_kwargs (dict) – A dict of keyword arguments to pass to python_callable. provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates.
The Airflow PythonOperator does exactly what you are looking for. It is a very simple but powerful operator, allowing you to execute a Python callable function from your DAG.
Passing context to tasks Airflow supports this use case by providing access to the task context. When providing provide_context=True to an operator, we pass along the Airflow context variables to be used inside the operator.
Add **kwargs to your operator parameters list after your threshold param
This is how you can pass arguments for a Python operator in Airflow.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from time import sleep
from datetime import datetime
def my_func(*op_args):
print(op_args)
return op_args[0]
with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag:
dummy_task = DummyOperator(task_id='dummy_task', retries=3)
python_task = PythonOperator(task_id='python_task', python_callable=my_func, op_args=['one', 'two', 'three'])
dummy_task >> python_task
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With