Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

airflow get result after executing an operator

I have configured airflow and created some Dags and subDags that call several operators.

My trouble is that when an operators runs and finishes the job, I'd like to receive the results back in some python structure. For instance:

File1.py

  ...
    ...
    sub_dag_one=SubDagOperator(subdag=subdag_accessHive(
PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, STEP, macros,path,
       ),
        task_id=DELP_DAG_NAME,
        dag=dag,
    )

File2.py

  from airflow import DAG
    from airflow.operators import HiveOperator
def subdag_callHive(parent, child, args, step,
                         user_defined_macros, path
                        ):
        dag_subdag = DAG(
            dag_id='%s.%s' % (parent, child),
            default_args=args,
            schedule_interval="@daily",
            template_searchpath=path,
            user_defined_macros=user_defined_macros,
        )

        # some work...

        HiveOperator(
            task_id='some_id',
            hiveconf_jinja_translate=True,
            hql='select field1 from public.mytable limit 4;',
            trigger_rule='all_done',
            dag=dag_subdag,
        )

        return dag_subdag 

The function subdag_callHive is called from another python script where the main Dag is defined and all the other parameters needed.

I just need would like to be able to get the result from the HiveOperator (*select * from public.mytable limit 4;*) that would be 4 values in this case.

the returned dag_subdag is an object < class 'airflow.models.DAG' > and contains all the attributes/data passed to the call but no information about what the HiveOperator did.

Is this possible? if So, how can it be accomplished.

like image 370
Alg_D Avatar asked Feb 07 '23 16:02

Alg_D


1 Answers

You can use Hooks for your need. Basically the HiveOperator does the same, he calls Hive Hooks which have multiple methods to work with results.

Use a PythonOperator to call a function which then starts a hive hook.

Following example might help you.

Code Snippet:

callHook = PythonOperator(
    task_id='foo',
    python_callable=do_work,
    dag=dag
)

def do_work():
    hiveserver = HiveServer2Hook()
    hql = "SELECT COUNT(*) FROM foo.bar"
    row_count = hiveserver.get_records(hql, schema='foo')
    print row_count[0][0]

All available methods can be found here: https://github.com/apache/incubator-airflow/blob/master/airflow/hooks/hive_hooks.py

like image 151
Thees Avatar answered Feb 11 '23 17:02

Thees