I'm trying to run a sample like StructuredKafkaWordCount. I started with the Spark Structured Streaming Programming guide.
My code is
package io.boontadata.spark.job1
import org.apache.spark.sql.SparkSession
object DirectKafkaAggregateEvents {
val FIELD_MESSAGE_ID = 0
val FIELD_DEVICE_ID = 1
val FIELD_TIMESTAMP = 2
val FIELD_CATEGORY = 3
val FIELD_MEASURE1 = 4
val FIELD_MEASURE2 = 5
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(s"""
|Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
| <brokers> is a list of one or more Kafka brokers
| <subscribeType> sample value: subscribe
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(bootstrapServers, subscribeType, topics) = args
val spark = SparkSession
.builder
.appName("boontadata-spark-job1")
.getOrCreate()
import spark.implicits._
// Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
// Generate running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
I added the following sbt files:
build.sbt:
name := "boontadata-spark-job1"
version := "0.1"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1"
// META-INF discarding
assemblyMergeStrategy in assembly := {
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
I also added project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
This creates a Uber jar with the non provided
jars.
I submit with the following line:
spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic
but I get this runtime error:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
... 18 more
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook
Is there a way to know which class is not found so that I can search the maven.org repo for that class.
The lookupDataSource
source code seems to be at line 543 at https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala but I couldn't find a direct link with Kafka data source...
Complete source code is here: https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f
The issue is the following section in build.sbt
:
// META-INF discarding
assemblyMergeStrategy in assembly := {
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
It says that all META-INF
entires should be discarded, including the "code" that makes data source aliases (e.g. kafka
) work.
But the META-INF
files are very important for kafka
(and other aliases of streaming data sources) to work.
For kafka
alias to work Spark SQL uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegister with the following entry:
org.apache.spark.sql.kafka010.KafkaSourceProvider
KafkaSourceProvider
is responsible to register kafka
alias with the proper streaming data source, i.e. KafkaSource.
Just to check that the real code is indeed available, but the "code" that makes the alias registered is not, you could use the kafka
data source by the fully-qualified name (not the alias) as follows:
spark.readStream.
format("org.apache.spark.sql.kafka010.KafkaSourceProvider").
load
You will see other problems due to missing options like kafka.bootstrap.servers
, but...we're digressing.
A solution is to MergeStrategy.concat
all META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
(that would create an uber-jar with all data sources, incl. the kafka
data source).
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
I tried like this it's working for me. Submit like this and let me know once you have any issues
./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar
In my case I also got this error while compiling with sbt, and the cause was that sbt assembly
was not including the spark-sql-kafka-0-10_2.11
artifact as part of the fat jar.
(I would be very welcome to comments here. The dependency was not specified a scope, so it should not be assumed to be "provided").
So I changed to deploying a normal (slim) jar and including the dependencies with the --jars
parameters to spark-submit.
In order to gather all dependencies in one place, you can add retrieveManaged := true
to your sbt project settings, or you can, in the sbt console, issue:
> set retrieveManaged := true
> package
That should bring all dependencies to the lib_managed
folder.
Then you can copy all those files (with a bash command you can for example use something like this
cd /path/to/your/project
JARLIST=$(find lib_managed -name '*.jar'| paste -sd , -)
spark-submit [other-args] target/your-app-1.0-SNAPSHOT.jar --jars "$JARLIST"
Those of you building your project on maven can try this out. Add the line mentioned below to your maven-shade-plugin.
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
I've put down the plugin code for the pom file as an example to show where to add the line.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
</resource>
</transformer>
</transformers>
<finalName>${project.artifactId}-${project.version}-uber</finalName>
</configuration>
</execution>
</executions>
</plugin>
Please excuse my formatting skills.
I'm using spark 2.1 and facing the very same problem my workaround is
1) spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
2) cd ~/.ivy2/jars
here you are ,all the needed jars are in this folder now
3) copy all the jars in this folder to all the nodes(can create a specific folder holding them)
4) add the folder name to spark.driver.extraClassPath
and spark.driver.extraClassPath
,e.g. spark.driver.extraClassPath=/opt/jars/*:your_other_jars
5 spark-submit --class ClassNm --Other-Options YourJar.jar
works fine now
I am using gradle as a build tool and the shadowJar plugin to create the uberJar. The solution was simply to add a File
src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
to the project.
In this file you need to put, line by line, the class names of the DataSources you use, in this case it would be org.apache.spark.sql.kafka010.KafkaSourceProvider
(find that class name for example here)
The reason is that Spark uses the java ServiceLoader in it's internal dependency management mechanisms.
Full example here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With