Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KafkaUtils class not found in Spark streaming

I have just began with Spark Streaming and I am trying to build a sample application that counts words from a Kafka stream. Although it compiles with sbt package, when I run it, I get NoClassDefFoundError. This post seems to have the same problem, but the solution is for Maven and I have not been able to reproduce it with sbt.

KafkaApp.scala:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaApp {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("kafkaApp").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(1))
    val kafkaParams = Map(
        "zookeeper.connect" -> "localhost:2181",
        "zookeeper.connection.timeout.ms" -> "10000",
        "group.id" -> "sparkGroup"
    )

    val topics = Map(
        "test" -> 1
    )

    // stream of (topic, ImpressionLog)
    val messages = KafkaUtils.createStream(ssc, kafkaParams, topics, storage.StorageLevel.MEMORY_AND_DISK)
    println(s"Number of words: %{messages.count()}")
  }
}

build.sbt:

name := "Simple Project"

version := "1.1"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.1.1",
    "org.apache.spark" %% "spark-streaming" % "1.1.1",
    "org.apache.spark" %% "spark-streaming-kafka" % "1.1.1"
)

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

And I submit it with:

bin/spark-submit \
  --class "KafkaApp" \
  --master local[4] \
  target/scala-2.10/simple-project_2.10-1.1.jar

Error:

14/12/30 19:44:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://[email protected]:65077/user/HeartbeatReceiver
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
    at KafkaApp$.main(KafkaApp.scala:28)
    at KafkaApp.main(KafkaApp.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
like image 607
kahlo Avatar asked Dec 30 '14 18:12

kahlo


3 Answers

Please try by including all dependency jars while submitting application:

./spark-submit --name "SampleApp" --deploy-mode client--master spark://host:7077 --class com.stackexchange.SampleApp --jars $SPARK_INSTALL_DIR/spark-streaming-kafka_2.10-1.3.0.jar,$KAFKA_INSTALL_DIR/libs/kafka_2.10-0.8.2.0.jar,$KAFKA_INSTALL_DIR/libs/metrics-core-2.2.0.jar,$KAFKA_INSTALL_DIR/libs/zkclient-0.3.jar spark-example-1.0-SNAPSHOT.jar

like image 122
Sandeep Avatar answered Nov 13 '22 17:11

Sandeep


spark-submit does not automatically put the package containing KafkaUtils. You need to have in your project JAR. For that you need to create an all inclusive uber-jar, using sbt assembly. Here is an example build.sbt .

https://github.com/tdas/spark-streaming-external-projects/blob/master/kafka/build.sbt

You obviously also need to add the assembly plugin to SBT.

https://github.com/tdas/spark-streaming-external-projects/tree/master/kafka/project

like image 16
Tathagata Das Avatar answered Nov 13 '22 17:11

Tathagata Das


Following build.sbt worked for me. It requires you to also put the sbt-assembly plugin in a file under the projects/ directory.

build.sbt

name := "NetworkStreaming" // https://github.com/sbt/sbt-assembly/blob/master/Migration.md#upgrading-with-bare-buildsbt

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.4.1",
  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.1",         // kafka
  "org.apache.hbase" % "hbase" % "0.92.1",
  "org.apache.hadoop" % "hadoop-core" % "1.0.2",
  "org.apache.spark" % "spark-mllib_2.10" % "1.3.0"
)

mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")          => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")      => MergeStrategy.discard
  case "log4j.properties"                                  => MergeStrategy.discard
  case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
  case "reference.conf"                                    => MergeStrategy.concat
  case _                                                   => MergeStrategy.first
}

project/plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

like image 1
Vibhuti Avatar answered Nov 13 '22 18:11

Vibhuti