I'm working on a Spark Streaming program which retrieves a Kafka stream, does very basic transformation on the stream and then inserts the data to a DB (voltdb if it's relevant). I'm trying to measure the rate in which I insert rows to the DB. I think metrics can be useful (using JMX). However I can't find how to add custom metrics to Spark. I've looked at Spark's source code and also found this thread however it doesn't work for me. I also enabled the JMX sink in the conf.metrics file. What's not working is I don't see my custom metrics with JConsole.
Could someone explain how to add custom metrics (preferably via JMX) to spark streaming? Or alternatively how to measure my insertion rate to my DB (specifically VoltDB)? I'm using spark with Java 8.
Below is a working example in Java.
It's tested with StreaminQuery
(Unfortunately StreaminQuery
does not have ootb metrics like StreamingContext
till Spark 2.3.1).
Steps:
Define a custom source in the same package of Source
class
package org.apache.spark.metrics.source;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.spark.sql.streaming.StreamingQueryProgress;
/**
* Metrics source for structured streaming query.
*/
public class StreamingQuerySource implements Source {
private String appName;
private MetricRegistry metricRegistry = new MetricRegistry();
private final Progress progress = new Progress();
public StreamingQuerySource(String appName) {
this.appName = appName;
registerGuage("batchId", () -> progress.batchId());
registerGuage("numInputRows", () -> progress.numInputRows());
registerGuage("inputRowsPerSecond", () -> progress.inputRowsPerSecond());
registerGuage("processedRowsPerSecond", () -> progress.processedRowsPerSecond());
}
private <T> Gauge<T> registerGuage(String name, Gauge<T> metric) {
return metricRegistry.register(MetricRegistry.name(name), metric);
}
@Override
public String sourceName() {
return String.format("%s.streaming", appName);
}
@Override
public MetricRegistry metricRegistry() {
return metricRegistry;
}
public void updateProgress(StreamingQueryProgress queryProgress) {
progress.batchId(queryProgress.batchId())
.numInputRows(queryProgress.numInputRows())
.inputRowsPerSecond(queryProgress.inputRowsPerSecond())
.processedRowsPerSecond(queryProgress.processedRowsPerSecond());
}
@Data
@Accessors(fluent = true)
private static class Progress {
private long batchId = -1;
private long numInputRows = 0;
private double inputRowsPerSecond = 0;
private double processedRowsPerSecond = 0;
}
}
Register the source right after SparkContext is created
querySource = new StreamingQuerySource(getSparkSession().sparkContext().appName());
SparkEnv.get().metricsSystem().registerSource(querySource);
Update data in StreamingQueryListener.onProgress(event)
querySource.updateProgress(event.progress());
Config metrics.properties
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=xxx
*.sink.graphite.port=9109
*.sink.graphite.period=10
*.sink.graphite.unit=seconds
# Enable jvm source for instance master, worker, driver and executor
master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
Sample output in graphite exporter (mapped to prometheus format)
streaming_query{application="local-1538032184639",model="model1",qty="batchId"} 38
streaming_query{application="local-1538032184639",model="model1r",qty="inputRowsPerSecond"} 2.5
streaming_query{application="local-1538032184639",model="model1",qty="numInputRows"} 5
streaming_query{application="local-1538032184639",model=model1",qty="processedRowsPerSecond"} 0.81
Ok after digging through the source code I found how to add my own custom metrics. It requires 3 things:
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
which enable JmxSink for all instancesI'm still struggling with how to actually count the number of insertions into VoltDB because the code runs on the executors but that's a subject for a different topic :)
I hope this will help others
Groupon have a library called spark-metrics
that lets you use a simple (Codahale-like) API on your executors and have the results collated back in the driver and automatically registered in Spark's existing metrics registry. These then get automatically exported along with Spark's built-in metrics when you configure a metric sink as per the Spark docs.
to insert rows from based on inserts from VoltDB, use accumulators - and then from your driver you can create a listener - maybe something like this to get you started
sparkContext.addSparkListener(new SparkListener() {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
stageCompleted.stageInfo.accumulables.foreach { case (_, acc) => {
here you have access to those rows combined accumulators and then you can send to your sink..
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With