I'm trying to call a python operator which is inside a function using another python operator. Seems something I missed, can someone help me to find out what I missed.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
dd = datetime(2018, 1, 1)
args = {
'owner': 'airflow',
'start_date': dd,
'retries': 0
}
def postgres_to_gcs():
t1 = BashOperator(
task_id='count_lines',
bash_command='echo "task1"',
xcom_push=True,
dag=dag)
return t1
with DAG('python_dag', description='Python DAG', schedule_interval='*/15 * * * *', start_date=dd, catchup=False) as dag:
python_task = PythonOperator(task_id='python_task', python_callable=postgres_to_gcs)
python_task
[2020-10-10 09:34:10,700] {baseoperator.py:351} WARNING - start_date for <Task(BashOperator): ttest-task> isn't datetime.datetime
[2020-10-10 09:34:10,700] {taskinstance.py:1150} ERROR - '>' not supported between instances of 'Pendulum' and 'str'
Traceback (most recent call last):
File "/root/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
result = task_copy.execute(context=context)
File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/root/airflow/dags/estdag.py", line 19, in postgres_to_gcs
dag=dag)
File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
result = func(*args, **kwargs)
File "/root/.local/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 101, in __init__
super(BashOperator, self).__init__(*args, **kwargs)
File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
result = func(*args, **kwargs)
File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 423, in __init__
self.dag = dag
File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 549, in dag
dag.add_task(self)
File "/root/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 1325, in add_task
task.start_date = max(task.start_date, self.start_date)
TypeError: '>' not supported between instances of 'Pendulum' and 'str'
[2020-10-10 09:34:10,702] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=python_dag, task_id=python_task, execution_date=20201010T093407, start_date=20201010T093410, end_date=20201010T093410
One workaround suggested by Racooneer (but still the issue is there)
Thanks, Racooneer!!!
Removing default_args helped to solve it, but not able to see bash command output
Using operators as you did is not allowed in Airflow. All the operators must live in the DAG context. As mentioned in Airflow official tutorial, the DAG definition "needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any".
The Airflow PythonOperator does exactly what you are looking for. It is a very simple but powerful operator, allowing you to execute a Python callable function from your DAG.
op_kwargs (dict) – A dict of keyword arguments to pass to python_callable. provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates.
An Operator is conceptually a template for a predefined Task, that you can just define declaratively inside your DAG: Airflow has a very extensive set of operators available, with some built-in to the core or pre-installed providers.
Note that operators are python classes. When you call operators inside python function remember that you just initialize the class constructor. To run the operator you will need to call its execute method.
Use the PythonOperator to execute Python callables. Use the op_args and op_kwargs arguments to pass additional arguments to the Python callable. When you set the provide_context argument to True, Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables and a templates_dict argument.
Use the op_args and op_kwargs arguments to pass additional arguments to the Python callable. When you set the provide_context argument to True, Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables and a templates_dict argument.
I'm not exactly sure what you are trying to do but the code you posted in the python function doesn't really execute the operator.
This should work just fine:
def postgres_to_gcs():
t1 = BashOperator(
task_id='count_lines',
bash_command='echo task1',
xcom_push=True #Note: there is no dag=dag here!
)
t1.execute(dict())
with DAG(
'python_dag',
description='Python DAG',
schedule_interval='*/15 * * * *',
start_date=datetime(2018, 1, 1),
catchup=False
) as dag:
python_task = PythonOperator(
task_id='python_task',
python_callable=postgres_to_gcs
)
Note that operators are python classes. When you call operators inside python function remember that you just initialize the class constructor. To run the operator you will need to call its execute
method.
Note: Using operator inside operator is not a good practice. You should use hooks or create custom operators. You can read more about why in the following answer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With