Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to clean up other resources when spark gets stopped

In my spark application, there is an object ResourceFactory which contains an akka ActorSystem for providing resource clients. So when I run this spark application, every worker node will create an ActorSystem. The problem is that when the spark application finishes its works and gets shutdown. The ActorSystem still keeps alive on every worker node and prevents the whole application to terminate, it's just hung on.

Is there a way to register some listener to the SparkContext so that when the sc gets shutdown, then the ActorSystem on every worker node will get notified to shutdown themselves?


UPDATE:

Following is the simplified skeleton:

There is a ResourceFactory, which is an object and it contains an actor system. And it also provides a fetchData method.

object ResourceFactory{
  val actorSystem = ActorSystem("resource-akka-system")
  def fetchData(): SomeData = ...
}

And then, there is a user-defined RDD class, in its compute method, it needs to fetch data from the ResourceFactory.

class MyRDD extends RDD[SomeClass] {
  override def compute(...) {
    ...
    ResourceFactory.fetchData()
    ...
    someIterator
  }
}

So on every node there will be one ActorSystem named "resource-akka-system", and those MyRDD instances distributed on those worker nodes can get data from the "resource-akka-system".

The problem is that, when the SparkContext gets shutdown, there is no need for those "resource-akka-system"s, but I don't know how to notify the ResourceFactory to shutdown the "resource-akka-system" when the SparkContext gets shutdown. So now, the "resouce-akka-system" keeps alive on each worker node and prevents the whole program to exit.


UPDATE2:

With some more experiments, I find that in local mode the program is hung on, but in yarn-cluster mode, the program will exit successfully. May be this is because yarn will kill the threads on worker nodes when the sc is shutdown?


UPDATE3:

To check whether every node contains an ActorSystem, I change the code as following(following is the real skeleton, as I add another class definition):

object ResourceFactory{
  println("creating resource factory")
  val actorSystem = ActorSystem("resource-akka-system")
  def fetchData(): SomeData = ...
}

class MyRDD extends RDD[SomeClass] {
  println("creating my rdd")
  override def compute(...) {
    new RDDIterator(...)
  }
}

class RDDIterator(...) extends Iterator[SomeClass] {
  println("creating rdd iterator")
  ...
  lazy val reader = {
    ...
    ResourceFactory.fetchData()
    ...
  }
  ...
  override next() = {
    ...
    reader.xx()
  }
}

After adding those printlns, I run the code on spark on yarn-cluster mode. I find that on the driver I have following prints:

creating my rdd
creating resource factory
creating my rdd
...

While on some of the workers, I have following prints:

creating rdd iterator
creating resource factory

And some of the workers, it prints nothing (and all of them are not assigned any tasks).

Based on the above, I think the object is initialized in driver eagerly, since it prints creating resource factory on the driver even when no thing refers to it, and object is initialized in worker lazily because it prints creating resource factory after printing creating rdd iterator as resource factory is lazily referenced by the first created RDDIterator.

And I find that in my use case the MyRDD class is only created in the driver.

I am not very sure about the laziness of the initialization of the object on driver and worker, it's my guess, because maybe it's caused by other part of the program to make it looks like that. But I think it should be right that there is one actor system on each worker node when it is necessary.

like image 938
宇宙人 Avatar asked Apr 13 '16 10:04

宇宙人


People also ask

How can we reduce garbage collection in Spark?

To avoid full GC in G1 GC, there are two commonly-used approaches: Decrease the InitiatingHeapOccupancyPercent option's value (the default value is 45), to let G1 GC starts initial concurrent marking at an earlier time, so that we are more likely to avoid full GC.

How do I get Spark to run in the background?

You can background the spark-submit process like any other linux process, by putting it into the background in the shell. In your case, the spark-submit job actually then runs the driver on YARN, so, it's baby-sitting a process that's already running asynchronously on another machine via YARN.

How do I get out of Spark job?

If you're sure that you're running standalone, then using System. exit() is fine, although it's frowned upon; however, Spark applications, despite looking like standalone apps, are actually running as part of the Spark runtime on the cluster, and you might terminate more than you intended to.

What is the difference between client mode and cluster mode in Spark?

In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.


1 Answers

I don't think that there is a way to tap into each Worker lifecycle.

Also I have some questions regarding your implementation:

  1. If you have object that contains val, that is used from function run on worker, my understanding is that this val gets serialized and broadcasted to worker. Can you confirm, that you have one ActorSystem running per worker?

  2. Actor System usually terminated immediately if you don't explicitly wait for it's termination. Are you calling something like system.awaitTermination or blocking on system.whenTerminated?



Anyway, there is another way, how you can shutdown actor systems on remote workers:

  1. Make your ActorSystem on each node part of the akka cluster. Here are some docs how to do that programmatically.
  2. Have address of your "coordination" Actor on driver node (where your sc is) broadcasted to each worker. In simple words, just have val with that address.
  3. When your akka system is started on each worker use that "coordination" Actor address to register this particular actor system (send corresponding message to coordination Actor).
  4. Coordination Actor keeps track of all registered "worker" Actors
  5. When your computation is completed and you want to shut down Akka system on every worker, send messages to all registered Actors from coordination Actor on driver node.
  6. Shutdown on worker Akka systems when "shutdown" message is received.
like image 72
Aivean Avatar answered Oct 28 '22 23:10

Aivean