Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using airflow to run spark streaming jobs?

We have in our hadoop cluster Spark Batch jobs and and Spark streaming jobs.

We would like to schedule and manage them both on the same platform.

We came across airflow, Which fits our need for a "platform to author, schedule, and monitor workflows".

I just want to be able to stop and start spark streaming job. Using airflow graphs and profiling is less of an issue.

My question is, Beside losing some functionality(graphs, profiling) , Why shouldn't I use Airflow to run spark streaming jobs?

I came across this question : Can airflow be used to run a never ending task?

which says it's possible and not why you shouldn't.

like image 326
Gilad Avatar asked Feb 20 '19 11:02

Gilad


People also ask

Can Airflow be used for Streaming jobs?

Airflow allows you to manage your data pipelines by authoring workflows as task-based Directed Acyclic Graphs (DAGs). Apache Airflow streams data? NO, it is not a streaming solution. Tasks do not transfer data from one to the other (though they can exchange metadata!).

Can you use Airflow with Databricks?

You define a workflow in a Python file and Airflow manages the scheduling and execution. Airflow provides tight integration between Databricks and Airflow. The Airflow Databricks integration lets you take advantage of the optimized Spark engine offered by Databricks with the scheduling features of Airflow.

How do I create a Spark connection in Airflow?

On the Spark page you can download the tgz file and unzip it on the machine that hosts Airflow. Put in the file . bashrc the SPARK_HOME and add it to the system PATH. Finally you must add the pyspark package to the environment where Airflow runs.


2 Answers

@mMorozonv's Looks good. You could have one DAG start the stream if it does not exist. Then a second DAG as a health checker to track it's progress. If the health check fails you could trigger the first DAG again.

Alternatively you can run the stream with a trigger interval of once[1].

# Load your Streaming DataFrame
sdf = spark.readStream.load(path="data/", format="json", schema=my_schema)
# Perform transformations and then write…
sdf.writeStream.trigger(once=True).start(path="/out/path", format="parquet")

This gives you all the same benefits of spark streaming, with the flexibility of batch processing.

You can simply point the stream at your data and this job will detect all the new files since the last iteration (using checkpointing), run a streaming batch, then terminate. You could trigger your airflow DAG's schedule to suit whatever lag you'd like to process data at (every minute, hour, etc.).

I wouldn't recommend this for low latency requirements, but its very suitable to be run every minute.

[1] https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html

like image 129
Ryan Avatar answered Nov 01 '22 22:11

Ryan


Using Airflow branching functionality we can have one dag which will do both scheduling and monitoring of our streaming job. Dag will do a status check of the application and in case application is not running dag will submit a streaming job. In another case dag execution can be finished or you can add a sensor which will check streaming job status after some time with alerts and other stuff you need.

There are two main problems:

  1. Submit streaming application without waiting until it will be finished. Otherwise our operator will run until it will reach execution_timeout;

That problem can be solved by scheduling out streaming job under cluster mode with spark.yarn.submit.waitAppCompletion configuration parameter set tofalse

  1. Check the status of our streaming operator;

We can check streaming application status using Yarn. For example we can use command yarn application -list -appStates RUNNING . In case our application will be among the list of running applications we should no trigger our streaming job. The only thing is to make streaming job name unique.

like image 29
Aleksejs R Avatar answered Nov 01 '22 22:11

Aleksejs R