How do I collect these metrics on a console (Spark Shell or Spark submit job) right after the task or job is done.
We are using Spark to load data from Mysql to Cassandra and it is quite huge (ex: ~200 GB and 600M rows). When the task the done, we want to verify how many rows exactly did spark process? We can get the number from Spark UI, but how can we retrieve that number ("Output Records Written") from spark shell or in spark-submit job.
Sample Command to load from Mysql to Cassandra.
val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load()
pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "payment_types", "keyspace" -> "test"))
I want to retrieve all the Spark UI metrics on the above task mainly Output size and Records Written.
Please help.
Thanks for your time!
Apache Spark provides a suite of Web UI/User Interfaces (Jobs, Stages, Tasks, Storage, Environment, Executors, and SQL) to monitor the status of your Spark/PySpark application, resource consumption of Spark cluster, and Spark configurations.
Hi @jamiet, the Input Size/Records DOES refer to the records in each partition. Spark partition the data based on the block size set in core-site.xml file. For e.g. if you data size is 2000 MB and your block-size set is 100 MB, then spark will split this data into 2000/100 = 20 partitions (each paritition of 100 MB)
"Shuffle Write" is actually meant as the sum of all written serialized data on all executors before transmitting (normally at the end of a stage) and "Shuffle Read" means the sum of read serialized data on all executors at the beginning of a stage.
Found the answer. You can get the stats by using SparkListener.
If your job has no input or output metrics you might get None.get exceptions which you can safely ignore by providing if stmt.
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val metrics = taskEnd.taskMetrics
if(metrics.inputMetrics != None){
inputRecords += metrics.inputMetrics.get.recordsRead}
if(metrics.outputMetrics != None){
outputWritten += metrics.outputMetrics.get.recordsWritten }
}
})
Please find the below example.
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
val conf = new SparkConf()
.set("spark.cassandra.connection.host", "...")
.set("spark.driver.allowMultipleContexts","true")
.set("spark.master","spark://....:7077")
.set("spark.driver.memory","1g")
.set("spark.executor.memory","10g")
.set("spark.shuffle.spill","true")
.set("spark.shuffle.memoryFraction","0.2")
.setAppName("CassandraTest")
sc.stop
val sc = new SparkContext(conf)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
var outputWritten = 0L
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val metrics = taskEnd.taskMetrics
if(metrics.inputMetrics != None){
inputRecords += metrics.inputMetrics.get.recordsRead}
if(metrics.outputMetrics != None){
outputWritten += metrics.outputMetrics.get.recordsWritten }
}
})
val bp = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "bucks_payments").option("partitionColumn","id").option("lowerBound","1").option("upperBound","14596").option("numPartitions","10").option("fetchSize","100000").option("user", "hadoop").option("password", "...").load()
bp.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "bucks_payments", "keyspace" -> "test"))
println("outputWritten",outputWritten)
Result:
scala> println("outputWritten",outputWritten)
(outputWritten,16383)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With