Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow SparkSubmitOperator push value to xcom

In my airflow spark jobs, I have a requirement to pass the spark job stats to other tasks in the workflow. How to push value from SparkSubmitOperator to xcom?

task1 = SparkSubmitOperator(
    task_id='spark_task',
    conn_id='spark_default',
    java_class='com.example',
    application='example.jar',
    name='spark-job',
    verbose=True,
    application_args=["10"],  
    conf={'master':'yarn'},
    dag=dag,
)


#pass value from task1 to task 2 via xcom

def somefunc(**kwargs):
    #pull value from task1
    kwargs["ti"].xcom_pull(task_ids='spark_task')

task2 = PythonOperator(task_id='task2',
                       python_callable=somefunc,
                       provide_context=True,
                       dag=dag)
like image 860
DevEx Avatar asked Nov 30 '25 00:11

DevEx


1 Answers

Currently SparkSubmitOperator/SparkSubmitHook aren't designed to return the job stats to XCom. You can easily update the operator to accommodate your needs:

from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

class SparkSubmitOperatorXCom(SparkSubmitOperator):

    def execute(self, context):
        super().execute(context)
        return self._hook._driver_status

Then you can initialise the operator to send the return of the execute method to XCom:

task1 = SparkSubmitOperatorXCom(
    do_xcom_push=True,
    ...
)

Note: In this case we are accessing a private property. This is the only way the SparkSubmitHook offers the driver status. For more complex job stats you will have to implement your own solution as the hook doesn't seem flexible enough to provide everything for you.

like image 90
Gorka Avatar answered Dec 01 '25 15:12

Gorka