Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark streaming throughput monitoring

Is there a way to monitor the input and output throughput of a Spark cluster, to make sure the cluster is not flooded and overflowed by incoming data?

In my case, I set up Spark cluster on AWS EC2, so I'm thinking of using AWS CloudWatch to monitor the NetworkIn and NetworkOut for each node in the cluster.

But my idea seems to be not accurate and network does not meaning incoming data for Spark only, maybe also some other data would be calculated too.

Is there a tool or way to monitor specifically for Spark cluster streaming data status? Or there's already a built-in tool in Spark that I missed?


update: Spark 1.4 released, monitoring at port 4040 is significantly enhanced with graphical display

like image 361
keypoint Avatar asked May 01 '15 00:05

keypoint


People also ask

How do I monitor my Spark?

Click Analytics > Spark Analytics > Open the Spark Application Monitoring Page. Click Monitor > Workloads, and then click the Spark tab. This page displays the user names of the clusters that you are authorized to monitor and the number of applications that are currently running in each cluster.

How does Spark read streaming data?

Use readStream. format("socket") from Spark session object to read data from the socket and provide options host and port where you want to stream data from.

Why do we use checkpoint in Spark Streaming?

A checkpoint helps build fault-tolerant and resilient Spark applications. Spark Structured Streaming maintains an intermediate state on HDFS compatible file systems to recover from failures. To specify the checkpoint in a streaming query, we use the checkpointLocation parameter.


1 Answers

Spark has a configurable metric subsystem. By default it publishes a JSON version of the registered metrics on <driver>:<port>/metrics/json. Other metrics syncs, like ganglia, csv files or JMX can be configured.

You will need some external monitoring system that collects metrics on a regular basis an helps you make sense of it. (n.b. We use Ganglia but there's other open source and commercial options)

Spark Streaming publishes several metrics that can be used to monitor the performance of your job. To calculate throughput, you would combine:

(lastReceivedBatch_processingEndTime-lastReceivedBatch_processingStartTime)/lastReceivedBatch_records

For all metrics supported, have a look at StreamingSource

Example: Starting a local REPL with Spark 1.3.1 and after executing a trivial streaming application:

import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(10))
val queue = scala.collection.mutable.Queue(1,2,3,45,6,6,7,18,9,10,11)
val q = queue.map(elem => sc.parallelize(Seq(elem)))
val dstream = ssc.queueStream(q)
dstream.print
ssc.start

one can GET localhost:4040/metrics/json and that returns:

{
version: "3.0.0",
gauges: {
local-1430558777965.<driver>.BlockManager.disk.diskSpaceUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.maxMem_MB: {
value: 2120
},
local-1430558777965.<driver>.BlockManager.memory.memUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.remainingMem_MB: {
value: 2120
},
local-1430558777965.<driver>.DAGScheduler.job.activeJobs: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.job.allJobs: {
value: 6
},
local-1430558777965.<driver>.DAGScheduler.stage.failedStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.runningStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.waitingStages: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_schedulingDelay: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_totalDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_records: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.receivers: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.retainedCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.runningBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalProcessedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalReceivedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.unprocessedBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.waitingBatches: {
value: 0
}
},
counters: { },
histograms: { },
meters: { },
timers: { }
}
like image 80
maasg Avatar answered Nov 11 '22 21:11

maasg