I want to use an accumulator to gather some stats about the data I'm manipulating on a Spark job. Ideally, I would do that while the job computes the required transformations, but since Spark would re-compute tasks on different cases the accumulators would not reflect true metrics. Here is how the documentation describes this:
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
This is confusing since most actions do not allow running custom code (where accumulators can be used), they mostly take the results from previous transformations (lazily). The documentation also shows this:
val acc = sc.accumulator(0) data.map(x => acc += x; f(x)) // Here, acc is still 0 because no actions have cause the `map` to be computed.
But if we add data.count()
at the end, would this be guaranteed to be correct (have no duplicates) or not? Clearly acc
is not used "inside actions only", as map is a transformation. So it should not be guaranteed.
On the other hand, discussion on related Jira tickets talk about "result tasks" rather than "actions". For instance here and here. This seems to indicate that the result would indeed be guaranteed to be correct, since we are using acc
immediately before and action and thus should be computed as a single stage.
I'm guessing that this concept of a "result task" has to do with the type of operations involved, being the last one that includes an action, like in this example, which shows how several operations are divided into stages (in magenta, image taken from here):
So hypothetically, a count()
action at the end of that chain would be part of the same final stage, and I would be guaranteed that accumulators used on the last map will no include any duplicates?
Clarification around this issue would be great! Thanks.
Accumulators are variables that are only “added” to through an associative operation and can therefore, be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.
Energy conservation: An accumulator can be used to supplement a pump during peak demand thereby reducing the size of the pump and motor required. The accumulator is charged during low demand segments of the pump cycle time and then discharges during the high demand portions of the circuit.
To answer the question "When are accumulators truly reliable ?"
Answer : When they are present in an Action operation.
As per the documentation in Action Task, even if any restarted tasks are present it will update Accumulator only once.
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
And Action do allow to run custom code.
For Ex.
val accNotEmpty = sc.accumulator(0) ip.foreach(x=>{ if(x!=""){ accNotEmpty += 1 } })
But, Why Map+Action viz. Result Task operations are not reliable for an Accumulator operation?
So it may happen same function may run multiple time on same data.So Spark does not provide any guarantee for accumulator getting updated because of the Map operation.
So it is better to use Accumulator in Action operation in Spark.
To know more about Accumulator and its issues refer this Blog Post - By Imran Rashid.
Accumulator updates are sent back to the driver when a task is successfully completed. So your accumulator results are guaranteed to be correct when you are certain that each task will have been executed exactly once and each task did as you expected.
I prefer relying on reduce
and aggregate
instead of accumulators because it is fairly hard to enumerate all the ways tasks can be executed.
That said, there are many simple cases where accumulators can be fully trusted.
val acc = sc.accumulator(0) val rdd = sc.parallelize(1 to 10, 2) val accumulating = rdd.map { x => acc += 1; x } accumulating.count assert(acc == 10)
Would this be guaranteed to be correct (have no duplicates)?
Yes, if speculative execution is disabled. The map
and the count
will be a single stage, so like you say, there is no way a task can be successfully executed more than once.
But an accumulator is updated as a side-effect. So you have to be very careful when thinking about how the code will be executed. Consider this instead of accumulating.count
:
// Same setup as before. accumulating.mapPartitions(p => Iterator(p.next)).collect assert(acc == 2)
This will also create one task for each partition, and each task will be guaranteed to execute exactly once. But the code in map
will not get executed on all elements, just the first one in each partition.
The accumulator is like a global variable. If you share a reference to the RDD that can increment the accumulator then other code (other threads) can cause it to increment too.
// Same setup as before. val x = new X(accumulating) // We don't know what X does. // It may trigger the calculation // any number of times. accumulating.count assert(acc >= 10)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With