I've written a custom state store and state store provider for Apache Spark 2.3.0 and tried to deploy the job using the additional argument:
--conf spark.sql.streaming.stateStore.providerClass=com.sample.state.CustomStateStoreProvider
For running Spark jobs I use Marathon and Mesos, and the job fails just after starting with the exception:
java.lang.ClassNotFoundException: com.sample.state.CustomStateStoreProvider
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:235)
at org.apache.spark.sql.execution.streaming.state.StateStoreProvider$.create(StateStore.scala:213)
at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.stateStoreCustomMetrics(statefulOperators.scala:121)
at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.metrics(statefulOperators.scala:86)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.metrics$lzycompute(statefulOperators.scala:251)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.metrics(statefulOperators.scala:251)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:58)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Here is the command to run the job:
/spark/bin/spark-submit \
--repositories "http://127.0.0.1:80/sbt-all" \
--packages com.sample:pipelines:0.1.0 \
--class com.sample.TestApplication \
--conf spark.sql.streaming.stateStore.providerClass=com.sample.state.CustomStateStoreProvider \
/spark/examples/jars/spark-examples_2.11-2.3.0.jar
Both of classes com.sample.TestApplication
and com.sample.state.CustomStateStoreProvider
are located in the com.sample:pipelines:0.1.0
package, and I already checked this several times. Without the spark.sql.streaming.stateStore.providerClass
parameter the applications starts and runs well.
I already tried to submit the job using additional class-paths for the driver and executors and using the --jars
parameter with the JAR located in HDFS or through HTTP.
P.S.: I don't have any problem when I try to start the job locally and everything works well in this case.
Well, in general, it's needed to enclose the value of the spark.sql.streaming.stateStore.providerClass
parameter into quotes: --conf spark.sql.streaming.stateStore.providerClass="com.sample.state.CustomStateStoreProvider"
. Without it, the space after the value will be included to the value and Spark will look for the com.sample.state.CustomStateStoreProvider
class (with the space symbol at the end of line) and won't be able to find it. Everything else works well at all :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With