Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Run a local cluster under a nondefault classloader

A Local Cluster from a web classloader

I'm trying to run a local cluster from a web container (yes, it's only for dev & testing purposes) and am having difficulty with classloaders.

Direct approach

When I do it the easy and recommended way,

ILocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topologyName, stormConf, topology);

I get rewarded with

Async loop died!: java.lang.ClassCastException: my.company.storm.bolt.SomeFilteringBolt cannot be cast to org.apache.storm.task.IBolt
    at org.apache.storm.daemon.executor$fn__7953$fn__7966.invoke(executor.clj:787)
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:482)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.lang.Thread.run(Thread.java:745)

This is because the classloader used to load and instantiate the StormTopology is an instance of a Jetty WebAppClassLoader, but the (sub)process spawned by LocalCluster.submitTopology() apparently uses the system classloader. I confirmed this by logging the classloader in the static block of the SomeFilteringBolt - the class is indeed loaded twice and the bolt from WebAppCL obviously cannot be cast to a bolt on the system classloader later on.

Expected behaviour

Now, this is surprising to me as I thought Storm would serialize the StormTopology instance, "send" it locally, deserialize it and run it. If it did that, though, it definitely would work. Rather it seems that it's directly using the provided StormTopology instance which is problematic under a different classloader.

What I have tried since

I tried setting these to true to force Storm to serialize my topology locally. No change.

  • TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE
  • and STORM_LOCAL_MODE_ZMQ

I tried running the LocalCluster under the system classloader:

ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
try {
    Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());

    Config topologyConf = createTopologyConfig();
    Map<String, Object> stormConf = createStormConfig(topologyConf);
    StormTopology topology = createTopology(topologyConf);

    ILocalCluster localCluster = new LocalCluster();
    localCluster.submitTopology(topologyName, stormConf, topology);
} finally {
    Thread.currentThread().setContextClassLoader(originalClassloader);
}

This actually got me a bit further:

Thread  died: java.lang.ExceptionInInitializerError
    at clojure.core__init.__init0(Unknown Source)
    at clojure.core__init.<clinit>(Unknown Source)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at clojure.lang.RT.classForName(RT.java:2154)
    at clojure.lang.RT.classForName(RT.java:2163)
    at clojure.lang.RT.loadClassForName(RT.java:2182)
    at clojure.lang.RT.load(RT.java:436)
    at clojure.lang.RT.load(RT.java:412)
    at clojure.lang.RT.doInit(RT.java:454)
    at clojure.lang.RT.<clinit>(RT.java:330)
    at clojure.lang.Namespace.<init>(Namespace.java:34)
    at clojure.lang.Namespace.findOrCreate(Namespace.java:176)
    at clojure.lang.Var.internPrivate(Var.java:151)
    at org.apache.storm.LocalCluster.<clinit>(Unknown Source)
    at my.company.storm.LocalTopologyRunner.startTopology(LocalTopologyRunner.java:146)
    ... 10 more
Caused by: java.lang.IllegalStateException: Attempting to call unbound fn: #'clojure.core/refer
    at clojure.lang.Var$Unbound.throwArity(Var.java:43)
    at clojure.lang.AFn.invoke(AFn.java:32)
    at clojure.lang.Var.invoke(Var.java:379)
    at clojure.lang.RT.doInit(RT.java:467)
    at clojure.lang.RT.<clinit>(RT.java:330)
    ... 18 more

Wat?!

The question

How can I run a Storm topology in local mode safely from a classloader other than the system classloader?

I'm running on Apache Storm 1.0.1, Jetty 8.1, Java 8u112 x64, Windows 7 x64.

like image 315
Petr Janeček Avatar asked Oct 29 '22 13:10

Petr Janeček


1 Answers

Not a Storm expert at all, but this reminds me of an old "identity crisis" problem i had in the past.

Two things to try:

  • Set the priority to the System class loader by calling org.eclipse.jetty.webapp.WebAppContext.setParentLoaderPriority(true)

  • If it doesn't work, you can call the methods org.eclipse.jetty.webapp.WebAppContext.setSystemClasses or org.eclipse.jetty.webapp.WebAppContext.addSystemClass to control which classes are considered System classes inside the webapp domain.

Do it for for the whole storm package (it allows wildcards like "org.apache.storm."), before loading them (during the webapp init).

Worth a shot! good luck.

like image 125
Fabien Benoit-Koch Avatar answered Nov 15 '22 06:11

Fabien Benoit-Koch