I have configured airflow and created some Dags and subDags that call several operators.
My trouble is that when an operators runs and finishes the job, I'd like to receive the results back in some python structure. For instance:
File1.py
  ...
    ...
    sub_dag_one=SubDagOperator(subdag=subdag_accessHive(
PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, STEP, macros,path,
       ),
        task_id=DELP_DAG_NAME,
        dag=dag,
    )
File2.py
  from airflow import DAG
    from airflow.operators import HiveOperator
def subdag_callHive(parent, child, args, step,
                         user_defined_macros, path
                        ):
        dag_subdag = DAG(
            dag_id='%s.%s' % (parent, child),
            default_args=args,
            schedule_interval="@daily",
            template_searchpath=path,
            user_defined_macros=user_defined_macros,
        )
        # some work...
        HiveOperator(
            task_id='some_id',
            hiveconf_jinja_translate=True,
            hql='select field1 from public.mytable limit 4;',
            trigger_rule='all_done',
            dag=dag_subdag,
        )
        return dag_subdag 
The function subdag_callHive is called from another python script where the main Dag is defined and all the other parameters needed.
I just need would like to be able to get the result from the HiveOperator (*select * from public.mytable limit 4;*) that would be 4 values in this case.
the returned dag_subdag is an object < class 'airflow.models.DAG' > and contains all the attributes/data passed to the call but no information about what the HiveOperator did.
Is this possible? if So, how can it be accomplished.
You can use Hooks for your need. Basically the HiveOperator does the same, he calls Hive Hooks which have multiple methods to work with results.
Use a PythonOperator to call a function which then starts a hive hook.
Following example might help you.
Code Snippet:
callHook = PythonOperator(
    task_id='foo',
    python_callable=do_work,
    dag=dag
)
def do_work():
    hiveserver = HiveServer2Hook()
    hql = "SELECT COUNT(*) FROM foo.bar"
    row_count = hiveserver.get_records(hql, schema='foo')
    print row_count[0][0]
All available methods can be found here: https://github.com/apache/incubator-airflow/blob/master/airflow/hooks/hive_hooks.py
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With