I'm trying to use custom log4j appender inside spark executor, in order to forward all logs to Apache Kafka.
The problem is, log4j is initialized before fatjar's classloader with appender gets registered, so I end up with following:
log4j:ERROR Could not instantiate class [kafka.producer.KafkaLog4jAppender].
java.lang.ClassNotFoundException: kafka.producer.KafkaLog4jAppender
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:260)
at org.apache.log4j.helpers.Loader.loadClass(Loader.java:198)
at org.apache.log4j.helpers.OptionConverter.instantiateByClassName(OptionConverter.java:327)
at org.apache.log4j.helpers.OptionConverter.instantiateByKey(OptionConverter.java:124)
at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:785)
at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
at org.apache.log4j.PropertyConfigurator.configureRootCategory(PropertyConfigurator.java:648)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:514)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
at org.apache.spark.Logging$class.initializeLogging(Logging.scala:122)
at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107)
at org.apache.spark.Logging$class.log(Logging.scala:51)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.log(CoarseGrainedExecutorBackend.scala:126)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:137)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:235)
at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
log4j:ERROR Could not instantiate appender named "KAFKA".
2015-09-29 13:10:43 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger started
2015-09-29 13:10:43 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO Remoting: Starting remoting
2015-09-29 13:10:43 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:36918]
2015-09-29 13:10:43 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:36918]
2015-09-29 13:10:44 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
2015-09-29 13:10:44 [driverPropsFetcher-akka.actor.default-dispatcher-4] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
2015-09-29 13:10:44 [sparkExecutor-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger started
2015-09-29 13:10:44 [sparkExecutor-akka.actor.default-dispatcher-2] INFO Remoting: Starting remoting
2015-09-29 13:10:44 [sparkExecutor-akka.actor.default-dispatcher-2] INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:40067]
2015-09-29 13:10:44 [sparkExecutor-akka.actor.default-dispatcher-2] INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:40067]
2015-09-29 13:10:44 [driverPropsFetcher-akka.actor.default-dispatcher-5] INFO Remoting: Remoting shut down
....
The problem seems to be right here: https://github.com/apache/spark/blob/v1.3.1/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L126
Is there any easy way to solve this? We are currently using Spark 1.3.x.
Thanks
David
Ended up submitting extra jar with logging deps and loading it before user classpath.
LOG_JAR="${THISDIR}/../lib/logging.jar"
spark-submit ...... \
--files "${LOG4J_CONF},${LOG_JAR}" \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=`basename ${LOG4J_CONF}`" \
--conf "spark.driver.extraClassPath=`basename ${LOG_JAR}`" \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=`basename ${LOG4J_CONF}`" \
--conf "spark.executor.extraClassPath=`basename ${LOG_JAR}`" \
...
https://issues.apache.org/jira/browse/SPARK-10881?filter=-2
Was facing the same issue , I will post what worked for me, it turns out the KafkaLog4jAppender
class package name changed in kafka 0.9, here is what I did, added following dependency in pom
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>0.9.0.0</version>
</dependency>
and changed my log4j.properties from
log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender
to
log4j.appender.KAFKA=org.apache.kafka.log4jappender.KafkaLog4jAppender
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With