Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get the number of records written (using DataFrameWriter's save operation)?

Is there any way to get the number of records written when using spark to save records? While I know it isn't in the spec currently, I'd like to be able to do something like:

val count = df.write.csv(path)

Alternatively, being able to do an inline count (preferably without just using a standard accumulator) of the results of a step would be (almost) as effective. i.e.:

dataset.countTo(count_var).filter({function}).countTo(filtered_count_var).collect()

Any ideas?

like image 584
Loki Avatar asked May 12 '17 09:05

Loki


People also ask

How do I count the number of rows in a Spark data frame?

To get the number of rows from the PySpark DataFrame use the count() function. This function returns the total number of rows from the DataFrame.

Which option can be used in Spark SQL if you need to use an in memory columnar structure to cache tables?

Caching Data In Memory Spark SQL can cache tables using an in-memory columnar format by calling spark. catalog. cacheTable("tableName") or dataFrame. cache() .

What is a DataFrame in Spark and how is it different from a SQL table?

What Are DataFrames? In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.


1 Answers

I'd use SparkListener that can intercept onTaskEnd or onStageCompleted events that you could use to access task metrics.

Task metrics give you the accumulators Spark uses to display metrics in SQL tab (in Details for Query).

web UI / Details for Query

For example, the following query:

spark.
  read.
  option("header", true).
  csv("../datasets/people.csv").
  limit(10).
  write.
  csv("people")

gives exactly 10 output rows so Spark knows it (and you could too).

enter image description here


You could also explore Spark SQL's QueryExecutionListener:

The interface of query execution listener that can be used to analyze execution metrics.

You can register a QueryExecutionListener using ExecutionListenerManager that's available as spark.listenerManager.

scala> :type spark.listenerManager
org.apache.spark.sql.util.ExecutionListenerManager

scala> spark.listenerManager.
clear   clone   register   unregister

I think it's closer to the "bare metal", but haven't used that before.


@D3V (in the comments section) mentioned accessing the numOutputRows SQL metrics using QueryExecution of a structured query. Something worth considering.

scala> :type q
org.apache.spark.sql.DataFrame

scala> :type q.queryExecution.executedPlan.metrics
Map[String,org.apache.spark.sql.execution.metric.SQLMetric]

q.queryExecution.executedPlan.metrics("numOutputRows").value
like image 155
Jacek Laskowski Avatar answered Oct 21 '22 15:10

Jacek Laskowski