Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to retrieve Metrics like Output Size and Records Written from Spark UI?

How do I collect these metrics on a console (Spark Shell or Spark submit job) right after the task or job is done.

We are using Spark to load data from Mysql to Cassandra and it is quite huge (ex: ~200 GB and 600M rows). When the task the done, we want to verify how many rows exactly did spark process? We can get the number from Spark UI, but how can we retrieve that number ("Output Records Written") from spark shell or in spark-submit job.

Sample Command to load from Mysql to Cassandra.

val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load()

pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "payment_types", "keyspace" -> "test"))

I want to retrieve all the Spark UI metrics on the above task mainly Output size and Records Written.

Please help.

Thanks for your time!

like image 684
Ajay Guyyala Avatar asked Apr 27 '16 18:04

Ajay Guyyala


People also ask

What are some of the things you can monitor in Spark Web UI?

Apache Spark provides a suite of Web UI/User Interfaces (Jobs, Stages, Tasks, Storage, Environment, Executors, and SQL) to monitor the status of your Spark/PySpark application, resource consumption of Spark cluster, and Spark configurations.

What is input size in Spark UI?

Hi @jamiet, the Input Size/Records DOES refer to the records in each partition. Spark partition the data based on the block size set in core-site.xml file. For e.g. if you data size is 2000 MB and your block-size set is 100 MB, then spark will split this data into 2000/100 = 20 partitions (each paritition of 100 MB)

What is shuffle write size records?

"Shuffle Write" is actually meant as the sum of all written serialized data on all executors before transmitting (normally at the end of a stage) and "Shuffle Read" means the sum of read serialized data on all executors at the beginning of a stage.


1 Answers

Found the answer. You can get the stats by using SparkListener.

If your job has no input or output metrics you might get None.get exceptions which you can safely ignore by providing if stmt.

sc.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val metrics = taskEnd.taskMetrics
    if(metrics.inputMetrics != None){
      inputRecords += metrics.inputMetrics.get.recordsRead}
    if(metrics.outputMetrics != None){
      outputWritten += metrics.outputMetrics.get.recordsWritten }
  }
})

Please find the below example.

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

val conf = new SparkConf()
.set("spark.cassandra.connection.host", "...")
.set("spark.driver.allowMultipleContexts","true")
.set("spark.master","spark://....:7077")
.set("spark.driver.memory","1g")
.set("spark.executor.memory","10g")
.set("spark.shuffle.spill","true")
.set("spark.shuffle.memoryFraction","0.2")
.setAppName("CassandraTest")
sc.stop
val sc = new SparkContext(conf)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

var outputWritten = 0L

sc.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val metrics = taskEnd.taskMetrics
    if(metrics.inputMetrics != None){
      inputRecords += metrics.inputMetrics.get.recordsRead}
    if(metrics.outputMetrics != None){
      outputWritten += metrics.outputMetrics.get.recordsWritten }
  }
})

val bp = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "bucks_payments").option("partitionColumn","id").option("lowerBound","1").option("upperBound","14596").option("numPartitions","10").option("fetchSize","100000").option("user", "hadoop").option("password", "...").load()
bp.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "bucks_payments", "keyspace" -> "test"))

println("outputWritten",outputWritten)

Result:

scala> println("outputWritten",outputWritten)
(outputWritten,16383)
like image 83
Ajay Guyyala Avatar answered Sep 20 '22 17:09

Ajay Guyyala