In my spark application, there is an object ResourceFactory
which contains an akka ActorSystem
for providing resource clients. So when I run this spark application, every worker node will create an ActorSystem
. The problem is that when the spark application finishes its works and gets shutdown. The ActorSystem
still keeps alive on every worker node and prevents the whole application to terminate, it's just hung on.
Is there a way to register some listener to the SparkContext
so that when the sc
gets shutdown, then the ActorSystem
on every worker node will get notified to shutdown themselves?
UPDATE:
Following is the simplified skeleton:
There is a ResourceFactory
, which is an object
and it contains an actor system
. And it also provides a fetchData
method.
object ResourceFactory{
val actorSystem = ActorSystem("resource-akka-system")
def fetchData(): SomeData = ...
}
And then, there is a user-defined RDD
class, in its compute
method, it needs to fetch data from the ResourceFactory
.
class MyRDD extends RDD[SomeClass] {
override def compute(...) {
...
ResourceFactory.fetchData()
...
someIterator
}
}
So on every node there will be one ActorSystem
named "resource-akka-system", and those MyRDD
instances distributed on those worker nodes can get data from the "resource-akka-system".
The problem is that, when the SparkContext
gets shutdown, there is no need for those "resource-akka-system"s, but I don't know how to notify the ResourceFactory
to shutdown the "resource-akka-system" when the SparkContext
gets shutdown. So now, the "resouce-akka-system" keeps alive on each worker node and prevents the whole program to exit.
UPDATE2:
With some more experiments, I find that in local mode the program is hung on, but in yarn-cluster
mode, the program will exit successfully. May be this is because yarn
will kill the threads on worker nodes when the sc
is shutdown?
UPDATE3:
To check whether every node contains an ActorSystem
, I change the code as following(following is the real skeleton, as I add another class definition):
object ResourceFactory{
println("creating resource factory")
val actorSystem = ActorSystem("resource-akka-system")
def fetchData(): SomeData = ...
}
class MyRDD extends RDD[SomeClass] {
println("creating my rdd")
override def compute(...) {
new RDDIterator(...)
}
}
class RDDIterator(...) extends Iterator[SomeClass] {
println("creating rdd iterator")
...
lazy val reader = {
...
ResourceFactory.fetchData()
...
}
...
override next() = {
...
reader.xx()
}
}
After adding those println
s, I run the code on spark on yarn-cluster mode. I find that on the driver I have following prints:
creating my rdd
creating resource factory
creating my rdd
...
While on some of the workers, I have following prints:
creating rdd iterator
creating resource factory
And some of the workers, it prints nothing (and all of them are not assigned any tasks).
Based on the above, I think the object
is initialized in driver eagerly, since it prints creating resource factory
on the driver even when no thing refers to it, and object
is initialized in worker lazily because it prints creating resource factory
after printing creating rdd iterator
as resource factory is lazily referenced by the first created RDDIterator.
And I find that in my use case the MyRDD
class is only created in the driver.
I am not very sure about the laziness of the initialization of the object
on driver and worker, it's my guess, because maybe it's caused by other part of the program to make it looks like that. But I think it should be right that there is one actor system on each worker node when it is necessary.
To avoid full GC in G1 GC, there are two commonly-used approaches: Decrease the InitiatingHeapOccupancyPercent option's value (the default value is 45), to let G1 GC starts initial concurrent marking at an earlier time, so that we are more likely to avoid full GC.
You can background the spark-submit process like any other linux process, by putting it into the background in the shell. In your case, the spark-submit job actually then runs the driver on YARN, so, it's baby-sitting a process that's already running asynchronously on another machine via YARN.
If you're sure that you're running standalone, then using System. exit() is fine, although it's frowned upon; however, Spark applications, despite looking like standalone apps, are actually running as part of the Spark runtime on the cluster, and you might terminate more than you intended to.
In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
I don't think that there is a way to tap into each Worker
lifecycle.
Also I have some questions regarding your implementation:
If you have object
that contains val
, that is used from function run on worker, my understanding is that this val
gets serialized and broadcasted to worker. Can you confirm, that you have one ActorSystem running per worker?
Actor System usually terminated immediately if you don't explicitly wait for it's termination. Are you calling something like system.awaitTermination
or blocking on system.whenTerminated
?
Anyway, there is another way, how you can shutdown actor systems on remote workers:
sc
is) broadcasted to each worker. In simple words, just have val
with that address.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With