Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Listener EventLoggingListener threw an exception / ConcurrentModificationException

Tags:

apache-spark

In our application (Spark 2.0.1) we have this exception popping up frequently. I can't find anything about this. What could be the cause ?

16/10/27 11:18:24 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
    at java.util.ArrayList$Itr.next(ArrayList.java:851)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
    at scala.collection.TraversableLike$class.to(TraversableLike.scala:590)
    at scala.collection.AbstractTraversable.to(Traversable.scala:104)
    at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:294)
    at scala.collection.AbstractTraversable.toList(Traversable.scala:104)
    at org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:314)
    at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
    at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:291)
    at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
    at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:283)
    at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145)
    at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:137)
    at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:157)
    at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45)
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
    at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1249)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)

EDIT: One more information, our application is long-running, and to resume from potentially failed spark context, we use the SparkBuilder.getOrCreate() method between two "jobs". Could this mess-up with the listeners ?

like image 687
mathieu Avatar asked Oct 27 '16 13:10

mathieu


2 Answers

It's a known problem in Spark 2.0.1 (SPARK-17816) and will be fixed with Spark 2.0.2/2.1.0 (related pull request).

To get rid of the exception without waiting for Spark 2.0.2/2.1.0, clone the latest, unstable spark version and build apache-spark manually.

Update: They released Spark 2.0.2!

like image 91
Sascha Vetter Avatar answered Nov 08 '22 12:11

Sascha Vetter


We also just upgraded to Spark 2.0.1 and started seeing the same exception. We narrowed the cause down to a section of Python code containing the following idiom:

a = spark_context.textFile('..')
a = a.map(stuff)
b = a.filter(stuff).map(stuff)

I've had issues in the past with variable self-assignment in Spark, but after upgrading to 2.0.1 the problem got really acute and we started seeing ConcurrentModification exceptions.

The fix for us was simply changing the code to not do any self-assignments.

like image 42
Christo Wilson Avatar answered Nov 08 '22 13:11

Christo Wilson