Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RDD CountApproximate taking far longer than requested timeout

In an attempt to reduce the time spent on gathering counts of DataFrame rows the RDD.countApproximate() is being invoked. It has the following signature:

def countApprox(
    timeout: Long,
    confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {

I have attempted to limit the output calculation to sixty seconds. Notice also the very low accuracy requirement of 0.10:

val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).getFinalValue.mean

However the actual time is .. 17 minutes ??

enter image description here

That time is almost the same as what was required to generate the data in the first place (19 minutes) !

So then - what is the use of this api: is there any way to get it to actually save some meaningful fraction of the exact time calculation?

TL;DR (See accepted answer): use initialValue instead of getFinalValue

like image 944
WestCoastProjects Avatar asked Jul 09 '18 05:07

WestCoastProjects


1 Answers

Please note the return type in the approxCount definition. It's a partial result.

def countApprox(
    timeout: Long,
    confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {

Now, please pay attention on how it is being used:

val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).**getFinalValue**.mean

According to spark scala doc, getFinalValue is blocking method which means it will wait for complete operation to finish.

Whereas initialValue can be fetched within specified timeout. So the following snippet will not block further operations after timeout,

val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).initialValue.mean

Please note the downside of using countApprox(timeout, confidence).initialValue is that even after getting the value, it will continue counting till it get final count that you would have obtained using getFinalValue and still will hold the resources till operation is complete.

Now the use of this api is not to get blocked at count operation.

Reference: https://mail-archives.apache.org/mod_mbox/spark-user/201505.mbox/%[email protected]%3E

Now lets validate our assumption of non blocking operation on spark2-shell. Lets create random dataframe and perform count, approxCount with getFinalValue and approxCount with initialValue:

scala> val schema = StructType((0 to 10).map(n => StructField(s"column_$n", StringType)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(column_0,StringType,true), StructField(column_1,StringType,true), StructField(column_2,StringType,true), StructField(column_3,StringType,true), StructField(column_4,StringType,true), StructField(column_5,StringType,true), StructField(column_6,StringType,true), StructField(column_7,StringType,true), StructField(column_8,StringType,true), StructField(column_9,StringType,true), StructField(column_10,StringType,true))

scala> val rows = spark.sparkContext.parallelize(Seq[Row](), 100).mapPartitions { _ => { Range(0, 100000).map(m => Row(schema.map(_ => Random.alphanumeric.filter(_.isLower).head.toString).toList: _*)).iterator } }
rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[1] at mapPartitions at <console>:32

scala> val inputDf = spark.sqlContext.createDataFrame(rows, schema)
inputDf: org.apache.spark.sql.DataFrame = [column_0: string, column_1: string ... 9 more fields]

//Please note that cnt will be displayed only when all tasks are completed
scala> val cnt = inputDf.rdd.count
cnt: Long = 10000000

scala> val waitSecs = 60
waitSecs: Int = 60

//cntApproxFinal will be displayed only when all tasks are completed.
scala> val cntApprxFinal = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).getFinalValue.mean
[Stage 1:======================================================> (98 + 2) / 100]cntApprxFinal: Double = 1.0E7

scala> val waitSecs = 60
waitSecs: Int = 60

//Please note that cntApprxInitila in this case, will be displayed exactly after timeout duration. In this case 80 tasks were completed within timeout and it displayed the value of variable. Even after displaying the variable value, it continued will all the remaining tasks
scala> val cntApprxInitial = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).initialValue.mean
[Stage 2:============================================>           (80 + 4) / 100]cntApprxInitial: Double = 1.0E7

[Stage 2:=======================================================>(99 + 1) / 100]

Let's have look at spark ui and spark-shell, all 3 operations took same time: All 3 operations took same time

cntApprxInitial is available before completion of all tasks.

cntApprxInitial is available before completion of all tasks

Hope, this helps!

like image 116
m-bhole Avatar answered Oct 27 '22 00:10

m-bhole