We have in our hadoop cluster Spark Batch jobs and and Spark streaming jobs.
We would like to schedule and manage them both on the same platform.
We came across airflow, Which fits our need for a "platform to author, schedule, and monitor workflows".
I just want to be able to stop and start spark streaming job. Using airflow graphs and profiling is less of an issue.
My question is, Beside losing some functionality(graphs, profiling) , Why shouldn't I use Airflow to run spark streaming jobs?
I came across this question : Can airflow be used to run a never ending task?
which says it's possible and not why you shouldn't.
Airflow allows you to manage your data pipelines by authoring workflows as task-based Directed Acyclic Graphs (DAGs). Apache Airflow streams data? NO, it is not a streaming solution. Tasks do not transfer data from one to the other (though they can exchange metadata!).
You define a workflow in a Python file and Airflow manages the scheduling and execution. Airflow provides tight integration between Databricks and Airflow. The Airflow Databricks integration lets you take advantage of the optimized Spark engine offered by Databricks with the scheduling features of Airflow.
On the Spark page you can download the tgz file and unzip it on the machine that hosts Airflow. Put in the file . bashrc the SPARK_HOME and add it to the system PATH. Finally you must add the pyspark package to the environment where Airflow runs.
@mMorozonv's Looks good. You could have one DAG start the stream if it does not exist. Then a second DAG as a health checker to track it's progress. If the health check fails you could trigger the first DAG again.
Alternatively you can run the stream with a trigger
interval of once
[1].
# Load your Streaming DataFrame
sdf = spark.readStream.load(path="data/", format="json", schema=my_schema)
# Perform transformations and then write…
sdf.writeStream.trigger(once=True).start(path="/out/path", format="parquet")
This gives you all the same benefits of spark streaming, with the flexibility of batch processing.
You can simply point the stream at your data and this job will detect all the new files since the last iteration (using checkpointing), run a streaming batch, then terminate. You could trigger your airflow DAG's schedule to suit whatever lag you'd like to process data at (every minute, hour, etc.).
I wouldn't recommend this for low latency requirements, but its very suitable to be run every minute.
[1] https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
Using Airflow branching functionality we can have one dag which will do both scheduling and monitoring of our streaming job. Dag will do a status check of the application and in case application is not running dag will submit a streaming job. In another case dag execution can be finished or you can add a sensor which will check streaming job status after some time with alerts and other stuff you need.
There are two main problems:
execution_timeout
;That problem can be solved by scheduling out streaming job under cluster
mode with spark.yarn.submit.waitAppCompletion
configuration parameter set tofalse
We can check streaming application status using Yarn. For example we can use command yarn application -list -appStates RUNNING
. In case our application will be among the list of running applications we should no trigger our streaming job. The only thing is to make streaming job name unique.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With