Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow - call a operator inside a function

Tags:

python

airflow

I'm trying to call a python operator which is inside a function using another python operator. Seems something I missed, can someone help me to find out what I missed.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago


dd = datetime(2018, 1, 1)
args = {
    'owner': 'airflow',
    'start_date': dd,
    'retries': 0

}

def postgres_to_gcs():
    t1 = BashOperator(
    task_id='count_lines',
    bash_command='echo "task1"',
    xcom_push=True,
    dag=dag)
    return t1



with DAG('python_dag', description='Python DAG', schedule_interval='*/15 * * * *', start_date=dd, catchup=False) as dag:
    python_task = PythonOperator(task_id='python_task', python_callable=postgres_to_gcs)
 
    python_task

Error:

[2020-10-10 09:34:10,700] {baseoperator.py:351} WARNING - start_date for <Task(BashOperator): ttest-task> isn't datetime.datetime
[2020-10-10 09:34:10,700] {taskinstance.py:1150} ERROR - '>' not supported between instances of 'Pendulum' and 'str'
Traceback (most recent call last):
  File "/root/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/root/airflow/dags/estdag.py", line 19, in postgres_to_gcs
    dag=dag)
  File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 101, in __init__
    super(BashOperator, self).__init__(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 423, in __init__
    self.dag = dag
  File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 549, in dag
    dag.add_task(self)
  File "/root/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 1325, in add_task
    task.start_date = max(task.start_date, self.start_date)
TypeError: '>' not supported between instances of 'Pendulum' and 'str'
[2020-10-10 09:34:10,702] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=python_dag, task_id=python_task, execution_date=20201010T093407, start_date=20201010T093410, end_date=20201010T093410

One workaround suggested by Racooneer (but still the issue is there)

Thanks, Racooneer!!!

Removing default_args helped to solve it, but not able to see bash command output

like image 213
TheDataGuy Avatar asked Oct 10 '20 07:10

TheDataGuy


People also ask

Can we use operator inside operator in Airflow?

Using operators as you did is not allowed in Airflow. All the operators must live in the DAG context. As mentioned in Airflow official tutorial, the DAG definition "needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any".

What is PythonOperator in Airflow?

The Airflow PythonOperator does exactly what you are looking for. It is a very simple but powerful operator, allowing you to execute a Python callable function from your DAG.

What is Op_kwargs?

op_kwargs (dict) – A dict of keyword arguments to pass to python_callable. provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates.

What is an Anan operator in airflow?

An Operator is conceptually a template for a predefined Task, that you can just define declaratively inside your DAG: Airflow has a very extensive set of operators available, with some built-in to the core or pre-installed providers.

How to call operators in python function?

Note that operators are python classes. When you call operators inside python function remember that you just initialize the class constructor. To run the operator you will need to call its execute method.

How do I use Python callables in airflow?

Use the PythonOperator to execute Python callables. Use the op_args and op_kwargs arguments to pass additional arguments to the Python callable. When you set the provide_context argument to True, Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables and a templates_dict argument.

How do I pass arguments to a callable in airflow?

Use the op_args and op_kwargs arguments to pass additional arguments to the Python callable. When you set the provide_context argument to True, Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables and a templates_dict argument.


1 Answers

I'm not exactly sure what you are trying to do but the code you posted in the python function doesn't really execute the operator.

This should work just fine:

def postgres_to_gcs():
    t1 = BashOperator(
        task_id='count_lines',
        bash_command='echo task1', 
        xcom_push=True            #Note: there is no dag=dag here!
    )
    t1.execute(dict())

with DAG(
        'python_dag',
        description='Python DAG',
        schedule_interval='*/15 * * * *',
        start_date=datetime(2018, 1, 1),
        catchup=False
) as dag:
    python_task = PythonOperator(
        task_id='python_task',
        python_callable=postgres_to_gcs
    )

Note that operators are python classes. When you call operators inside python function remember that you just initialize the class constructor. To run the operator you will need to call its execute method.

Note: Using operator inside operator is not a good practice. You should use hooks or create custom operators. You can read more about why in the following answer.

like image 55
Elad Kalif Avatar answered Oct 19 '22 15:10

Elad Kalif