Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Custom state store provider for Apache Spark on Mesos

I've written a custom state store and state store provider for Apache Spark 2.3.0 and tried to deploy the job using the additional argument:

--conf spark.sql.streaming.stateStore.providerClass=com.sample.state.CustomStateStoreProvider

For running Spark jobs I use Marathon and Mesos, and the job fails just after starting with the exception:

java.lang.ClassNotFoundException: com.sample.state.CustomStateStoreProvider 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:235)
    at org.apache.spark.sql.execution.streaming.state.StateStoreProvider$.create(StateStore.scala:213)
    at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.stateStoreCustomMetrics(statefulOperators.scala:121)
    at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.metrics(statefulOperators.scala:86)
    at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.metrics$lzycompute(statefulOperators.scala:251)
    at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.metrics(statefulOperators.scala:251)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:58)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan$1.apply(SparkPlanInfo.scala:62)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

Here is the command to run the job:

/spark/bin/spark-submit \
    --repositories "http://127.0.0.1:80/sbt-all" \
    --packages com.sample:pipelines:0.1.0 \
    --class com.sample.TestApplication \
    --conf spark.sql.streaming.stateStore.providerClass=com.sample.state.CustomStateStoreProvider \
    /spark/examples/jars/spark-examples_2.11-2.3.0.jar

Both of classes com.sample.TestApplication and com.sample.state.CustomStateStoreProvider are located in the com.sample:pipelines:0.1.0 package, and I already checked this several times. Without the spark.sql.streaming.stateStore.providerClass parameter the applications starts and runs well.

I already tried to submit the job using additional class-paths for the driver and executors and using the --jars parameter with the JAR located in HDFS or through HTTP.

P.S.: I don't have any problem when I try to start the job locally and everything works well in this case.

like image 841
Alex Chermenin Avatar asked Nov 07 '22 04:11

Alex Chermenin


1 Answers

Well, in general, it's needed to enclose the value of the spark.sql.streaming.stateStore.providerClass parameter into quotes: --conf spark.sql.streaming.stateStore.providerClass="com.sample.state.CustomStateStoreProvider". Without it, the space after the value will be included to the value and Spark will look for the com.sample.state.CustomStateStoreProvider class (with the space symbol at the end of line) and won't be able to find it. Everything else works well at all :)

like image 85
Alex Chermenin Avatar answered Nov 15 '22 13:11

Alex Chermenin