Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow Custom Metrics and/or Result Object with custom fields

While running pySpark SQL pipelines via Airflow I am interested in getting out some business stats like:

  • source read count
  • target write count
  • sizes of DFs during processing
  • error records count

One idea is to push it directly to the metrics, so it will gets automatically consumed by monitoring tools like Prometheus. Another idea is to obtain these values via some DAG result object, but I wasn't able to find anything about it in docs.

Please post some at least pseudo code if you have solution.

like image 493
kensai Avatar asked Apr 29 '19 09:04

kensai


1 Answers

I would look to reuse Airflow's statistics and monitoring support in the airflow.stats.Stats class. Maybe something like this:

import logging
from airflow.stats import Stats

PYSPARK_LOG_PREFIX = "airflow_pyspark"


def your_python_operator(**context):
    [...]

    try:
        Stats.incr(f"{PYSPARK_LOG_PREFIX}_read_count", src_read_count)
        Stats.incr(f"{PYSPARK_LOG_PREFIX}_write_count", tgt_write_count)
        # So on and so forth
    except:
        logging.exception("Caught exception during statistics logging")

    [...]
like image 57
joebeeson Avatar answered Oct 02 '22 18:10

joebeeson