Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Child thread not seeing updates made by main thread

I'm implementing a SparkHealthListener by extending SparkListener class.

@Component
class ClusterHealthListener extends SparkListener with Logging {
  val appRunning = new AtomicBoolean(false)
  val executorCount = new AtomicInteger(0)

  override def onApplicationStart(applicationStart: SparkListenerApplicationStart) = {
    logger.info("Application Start called .. ")
    this.appRunning.set(true)
    logger.info(s"[appRunning = ${appRunning.get}]")
  }

  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = {
    logger.info("Executor add called .. ")
    this.executorCount.incrementAndGet()
    logger.info(s"[executorCount = ${executorCount.get}]")
  }
}

appRunning and executorCount are two variables declared in ClusterHealthListener class. ClusterHealthReporterThread will only read the values.

@Component
class ClusterHealthReporterThread @Autowired() (healthListener: ClusterHealthListener) extends Logging {
  new Thread {
    override def run(): Unit = {
      while (true) {
          Thread.sleep(10 * 1000)
          logger.info("Checking range health")
          logger.info(s"[appRunning = ${healthListener.appRunning.get}] [executorCount=${healthListener.executorCount.get}]"
      }
    }
  }.start()
}

ClusterHealthReporterThread is always reporting the initialized values regardless of the changes made to the variable by main thread? What am I doing wrong? Is this because I inject healthListener to ClusterHealthReporterThread?

Update

I played around a bit and looks like it has something to do with the way i initiate spark listener.

If I add the spark listener like this

val sparkContext = SparkContext.getOrCreate(sparkConf) sparkContext.addSparkListener(healthListener)

Parent thread will show appRunning as 'false' always but shows executor count correctly. Child thread (health reporter) will also show proper executor counts but appRunning was always reporting 'false' like that of the main thread.

Then I stumbled across this Why is SparkListenerApplicationStart never fired? and tried setting listener at the spark config level,

.set("spark.extraListeners", "HealthListener class path")

If I do this, main thread will report 'true' for appRunning and will report correct executor counts but child thread will always report 'false' and '0' value for executors.

like image 609
Nithin Chandy Avatar asked Sep 14 '18 00:09

Nithin Chandy


1 Answers

I can't immediately see what's wrong here, you might have found an interesting edge case.

I think @m4gic's comment might be correct, that the logging library is perhaps caching that interpolated string? It looks like you are using https://github.com/lightbend/scala-logging which claims that this interpolation "has no effect on behavior", so maybe not. Please could you follow his suggestion to retry without using that feature and report back?

A second possibility: I wonder if there is only one ClusterHealthListener in the system? Perhaps the autowiring is causing a second instance to be created? Can you log the object ids of the ClusterHealthListener reference in both locations and verify that they are the same object?

If neither of those suggestions fix this, are you able to post a working example that I can play with?

like image 89
Rich Avatar answered Nov 15 '22 06:11

Rich