I am trying to resolve a spark-submit classpath runtime issue for an Apache Tika (>v 1.14) parsing job. The problem seems to involve spark-submit classpath vs my uber-jar.
Platforms: CDH 5.15 (Spark 2.3 added via CDH docs) and CDH 6 (Spark 2.2 bundled in CDH 6)
I've tried / reviewed:
(Cloudera) Where does spark-submit look for Jar files?
(stackoverflow) resolving-dependency-problems-in-apache-spark
(stackoverflow) Apache Tika ArchiveStreamFactory.detect error
Highlights:
Results if I include --conf flag(s) to spark-submit:
$ spark-submit --master local[*] --class com.example.App --conf spark.executor.userClassPathFirst=true ./target/uber-tikaTest-1.19.jar
18/09/25 13:35:55 ERROR util.Utils: Exception encountered
java.lang.NullPointerException
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:72)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1307)
at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:312)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/09/25 13:35:55 ERROR util.Utils: Exception encountered
java.lang.NullPointerException
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:72)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1307)
at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:312)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Below the following error message are files for:
The error at runtime:
18/09/25 11:47:39 ERROR executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NoSuchMethodError: org.apache.commons.compress.archivers.ArchiveStreamFactory.detect(Ljava/io/InputStream;)Ljava/lang/String;
at org.apache.tika.parser.pkg.ZipContainerDetector.detectArchiveFormat(ZipContainerDetector.java:160)
at org.apache.tika.parser.pkg.ZipContainerDetector.detect(ZipContainerDetector.java:104)
at org.apache.tika.detect.CompositeDetector.detect(CompositeDetector.java:84)
at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:116)
at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:159)
at com.example.App$.tikaAutoDetectParser(App.scala:55)
at com.example.App$$anonfun$1.apply(App.scala:69)
at com.example.App$$anonfun$1.apply(App.scala:69)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1799)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/09/25 11:47:39 ERROR executor.Executor: Exception in task 5.0 in stage 0.0 (TID 5)
java.lang.NoSuchMethodError: org.apache.commons.compress.archivers.ArchiveStreamFactory.detect(Ljava/io/InputStream;)Ljava/lang/String;
at org.apache.tika.parser.pkg.ZipContainerDetector.detectArchiveFormat(ZipContainerDetector.java:160)
at org.apache.tika.parser.pkg.ZipContainerDetector.detect(ZipContainerDetector.java:104)
at org.apache.tika.detect.CompositeDetector.detect(CompositeDetector.java:84)
at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:116)
at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:159)
at com.example.App$.tikaAutoDetectParser(App.scala:55)
at com.example.App$$anonfun$1.apply(App.scala:69)
at com.example.App$$anonfun$1.apply(App.scala:69)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1799)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
build-and-run.sh:
Notes:
build-and-run.sh
mvn compile
if true
then
spark-submit --master local[*] --class com.example.App ./target/uber-tikaTest-1.19.jar
fi
# tried the using the userClass flags for driver and executor for above and below calls to spark-submit
# --conf spark.driver.userClassPathFirst=true \
# --conf spark.executor.userClassPathFirst=true \
if false
then
spark-submit --class com.example.App \
--master yarn \
--packages org.apache.commons:commons-compress:1.18 \
--jars ./target/uber-tikaTest-1.19.jar \
--num-executors 2 \
--executor-memory 1024m \
--executor-cores 2 \
--driver-memory 2048m \
--driver-cores 1 \
./target/uber-tikaTest-1.19.jar
fi
Sample App:
package com.example
////////// Tika Imports
import org.apache.tika.metadata.Metadata
import org.apache.tika.parser.AutoDetectParser
import org.apache.tika.sax.BodyContentHandler
////////// Java HTTP Imports
import java.net.URL;
import java.net.HttpURLConnection
import scala.collection.JavaConverters._
import scala.collection.mutable._
////////// Spark Imports
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.{Row,SparkSession}
object App {
case class InputStreamData(sourceURL: String, headerFields: Map[String,List[String]], inputStream: java.io.InputStream)
def openUrlStream(sourceURL:String,apiKey:String):(InputStreamData) = {
try {
val url = new URL(sourceURL)
val urlConnection = url.openConnection().asInstanceOf[HttpURLConnection]
urlConnection.setInstanceFollowRedirects(true)
val headerFields = urlConnection.getHeaderFields()
val input = urlConnection.getInputStream()
InputStreamData(sourceURL, headerFields.asScala.map(x => (x._1,x._2.asScala.toList)), input)
}
catch {
case e: Exception => {
println("**********************************************************************************************")
println("PARSEURL: INVALID URL: " + sourceURL)
println(e.toString())
println("**********************************************************************************************")
InputStreamData(sourceURL, Map("ERROR" -> List("ERROR")), null)
}
}
}
def tikaAutoDetectParser(inputStream:java.io.InputStream):String = {
var parser = new AutoDetectParser();
var handler = new BodyContentHandler(-1);
var metadata = new Metadata();
parser.parse(inputStream, handler, metadata);
return handler.toString()
}
def main(args : Array[String]) {
var sparkConf = new SparkConf().setAppName("tika-1.19-test")
val sc = new SparkContext(sparkConf)
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
println("HELLO!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
var urls = List("http://www.pdf995.com/samples/pdf.pdf", "https://www.amd.com/en", "http://jeroen.github.io/images/testocr.png")
var rdd = sc.parallelize(urls)
var parsed = rdd.map(x => tikaAutoDetectParser(openUrlStream(x,"").inputStream))
println(parsed.count)
}
}
pom.xml (builds uber-jar):
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>tikaTest</artifactId>
<version>1.19</version>
<name>${project.artifactId}</name>
<description>Testing tika 1.19 with CDH 6 and 5.x, Spark 2.x, Scala 2.11.x</description>
<inceptionYear>2018</inceptionYear>
<licenses>
<license>
<name>My License</name>
<url>http://....</url>
<distribution>repo</distribution>
</license>
</licenses>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<profiles>
<profile>
<id>scala-2.11.12</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<scalaVersion>2.11.12</scalaVersion>
<scalaBinaryVersion>2.11.12</scalaBinaryVersion>
</properties>
<dependencies>
<!-- ************************************************************************** -->
<!-- GOOD DEPENDENCIES +++++++++++++++++++++++++++++++++++++ -->
<!-- ************************************************************************** -->
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.18</version>
</dependency>
<!-- *************** CDH flavored dependencies ***********************************************-->
<!-- https://www.cloudera.com/documentation/spark2/latest/topics/spark2_packaging.html#versions -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0.cloudera3</version>
<!-- have tried scope provided / compile -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0.cloudera3</version>
<!-- have tried scope provided / compile -->
<!--<scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.tika/tika-core -->
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
<version>1.19</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.tika/tika-parsers -->
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-parsers</artifactId>
<version>1.19</version>
</dependency>
<!-- https://mvnrepository.com/artifact/javax.ws.rs/javax.ws.rs-api -->
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<!-- **************************************************************************************************************************
**************************** alternative dependencies that have been tried and yield same Tika error***************************
*******************************************************************************************************************************-->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<!--
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
-->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<!--
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
-->
</dependencies>
</profile>
</profiles>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<args>
<!-- work-around for https://issues.scala-lang.org/browse/SI-8358 -->
<arg>-nobootcp</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<finalName>uber-${project.artifactId}-${project.version}</finalName>
</configuration>
</plugin>
</plugins>
</build>
</project>
mvn dependency tree:
Notes:
$ mvn dependency:tree -Ddetail=true | grep compress
[INFO] +- org.apache.commons:commons-compress:jar:1.18:compile
[INFO] | +- com.ning:compress-lzf:jar:1.0.3:compile
$ mvn dependency:tree -Ddetail=true | grep commons
[INFO] +- org.apache.commons:commons-compress:jar:1.18:compile
[INFO] | | | \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO] | | | +- commons-cli:commons-cli:jar:1.2:compile
[INFO] | | | +- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] | | | +- commons-configuration:commons-configuration:jar:1.6:compile
[INFO] | | | | +- commons-digester:commons-digester:jar:1.8:compile
[INFO] | | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO] | | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] | +- org.apache.commons:commons-lang3:jar:3.5:compile
[INFO] | +- org.apache.commons:commons-math3:jar:3.4.1:compile
[INFO] | +- commons-net:commons-net:jar:2.2:compile
[INFO] | +- org.apache.commons:commons-crypto:jar:1.0.0:compile
[INFO] | | +- org.codehaus.janino:commons-compiler:jar:3.0.8:compile
[INFO] | | \- commons-lang:commons-lang:jar:2.6:compile
[INFO] | +- commons-codec:commons-codec:jar:1.11:compile
[INFO] | | \- org.apache.commons:commons-collections4:jar:4.2:compile
[INFO] | +- org.apache.commons:commons-exec:jar:1.3:compile
[INFO] | +- commons-io:commons-io:jar:2.6:compile
[INFO] | +- org.apache.commons:commons-csv:jar:1.5:compile
This is the result of a dependency conflict.
Because it is not an internal conflict within your jar file. It is a conflict with Apache Spark.
Spark 2.x
distributions include old versions of commons-compress, while Tika
library depends on version 1.18 of commons-compress
library.
Use --driver-class-path
argument in your spark-shell
or spark-submit
to point to a the right version of commons-compress
library.
spark-submit
--driver-class-path ~/.m2/repository/org/apache/commons/commons-compress/1.18/commons-compress-1.18.jar
--class {you.main.class}
....
I was facing the same exact problem. Found the answer in this post enter link description here
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With