I'm trying to use the spark-cassandra-connector via spark-shell on dataproc, however I am unable to connect to my cluster. It appears that there is a version mismatch since the classpath is including a much older guava version from somewhere else, even when I specify the proper version on startup. I suspect this is likely caused by all the Hadoop dependencies put into the classpath by default.
Is there anyway to have spark-shell use only the proper version of guava, without getting rid of all the Hadoop-related dataproc included jars?
Relevant Data:
Starting spark-shell, showing it having the proper version of Guava: $ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3
:: loading settings :: url = jar:file:/usr/lib/spark/lib/spark-assembly-1.5.2-hadoop2.7.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.datastax.spark#spark-cassandra-connector_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 in central
found org.apache.cassandra#cassandra-clientutil;2.2.2 in central
found com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 in central
found io.netty#netty-handler;4.0.27.Final in central
found io.netty#netty-buffer;4.0.27.Final in central
found io.netty#netty-common;4.0.27.Final in central
found io.netty#netty-transport;4.0.27.Final in central
found io.netty#netty-codec;4.0.27.Final in central
found com.codahale.metrics#metrics-core;3.0.2 in central
found org.slf4j#slf4j-api;1.7.5 in central
found org.apache.commons#commons-lang3;3.3.2 in central
found com.google.guava#guava;16.0.1 in central
found org.joda#joda-convert;1.2 in central
found joda-time#joda-time;2.3 in central
found com.twitter#jsr166e;1.1.0 in central
found org.scala-lang#scala-reflect;2.10.5 in central
:: resolution report :: resolve 502ms :: artifacts dl 10ms
:: modules in use:
com.codahale.metrics#metrics-core;3.0.2 from central in [default]
com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 from central in [default]
com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 from central in [default]
com.google.guava#guava;16.0.1 from central in [default]
com.twitter#jsr166e;1.1.0 from central in [default]
io.netty#netty-buffer;4.0.27.Final from central in [default]
io.netty#netty-codec;4.0.27.Final from central in [default]
io.netty#netty-common;4.0.27.Final from central in [default]
io.netty#netty-handler;4.0.27.Final from central in [default]
io.netty#netty-transport;4.0.27.Final from central in [default]
joda-time#joda-time;2.3 from central in [default]
org.apache.cassandra#cassandra-clientutil;2.2.2 from central in [default]
org.apache.commons#commons-lang3;3.3.2 from central in [default]
org.joda#joda-convert;1.2 from central in [default]
org.scala-lang#scala-reflect;2.10.5 from central in [default]
org.slf4j#slf4j-api;1.7.5 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 16 | 0 | 0 | 0 || 16 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 16 already retrieved (0kB/12ms)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.5.2
/_/
Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.8.0_66-internal)
Type in expressions to have them evaluated.
Type :help for more information.
15/12/10 17:38:46 WARN org.apache.spark.metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
Spark context available as sc.
ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
15/12/10 17:38:54 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/12/10 17:38:54 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
SQL context available as sqlContext.
Stack Trace when doing initial connection:
java.io.IOException: Failed to open native connection to Cassandra at {10.240.0.7}:9042
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:249)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:51)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:59)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:59)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:146)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
at org.apache.spark.rdd.RDD.count(RDD.scala:1125)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
at $iwC$$iwC$$iwC.<init>(<console>:53)
at $iwC$$iwC.<init>(<console>:55)
at $iwC.<init>(<console>:57)
at <init>(<console>:59)
at .<init>(<console>:63)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$pasteCommand(SparkILoop.scala:825)
at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345)
at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345)
at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
at scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:809)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback(Lcom/google/common/util/concurrent/ListenableFuture;Lcom/google/common/util/concurrent/FutureFallback;Ljava/util/concurrent/Executor;)Lcom/google/common/util/concurrent/Listenab
leFuture;
at com.datastax.driver.core.Connection.initAsync(Connection.java:178)
at com.datastax.driver.core.Connection$Factory.open(Connection.java:742)
at com.datastax.driver.core.ControlConnection.tryConnect(ControlConnection.java:240)
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:187)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1393)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:402)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:155)
... 70 more
Unfortunately, Hadoop's dependency on Guava 11 (which doesn't have the Futures.withFallback method mentioned) is a longstanding issue and indeed Hadoop 2.7.1 still depends on Guava 11.
Spark core uses Guava 14, as can be seen here but this is worked around by shading Guava inside the Spark assembly:
$ jar tf /usr/lib/spark/lib/spark-assembly.jar | grep concurrent.Futures
org/spark-project/guava/util/concurrent/Futures$1.class
org/spark-project/guava/util/concurrent/Futures$2.class
org/spark-project/guava/util/concurrent/Futures$3.class
org/spark-project/guava/util/concurrent/Futures$4.class
org/spark-project/guava/util/concurrent/Futures$5.class
org/spark-project/guava/util/concurrent/Futures$6.class
org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture$1.class
org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture.class
org/spark-project/guava/util/concurrent/Futures$CombinedFuture$1.class
org/spark-project/guava/util/concurrent/Futures$CombinedFuture$2.class
org/spark-project/guava/util/concurrent/Futures$CombinedFuture.class
org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1$1.class
org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1.class
org/spark-project/guava/util/concurrent/Futures$FallbackFuture.class
org/spark-project/guava/util/concurrent/Futures$FutureCombiner.class
org/spark-project/guava/util/concurrent/Futures$ImmediateCancelledFuture.class
org/spark-project/guava/util/concurrent/Futures$ImmediateFailedCheckedFuture.class
org/spark-project/guava/util/concurrent/Futures$ImmediateFailedFuture.class
org/spark-project/guava/util/concurrent/Futures$ImmediateFuture.class
org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulCheckedFuture.class
org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulFuture.class
org/spark-project/guava/util/concurrent/Futures$MappingCheckedFuture.class
org/spark-project/guava/util/concurrent/Futures.class
$ javap -cp /usr/lib/spark/lib/spark-assembly.jar org.spark-project.guava.util.concurrent.Futures
Compiled from "Futures.java"
public final class org.spark-project.guava.util.concurrent.Futures {
public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> makeChecked(org.spark-project.guava.util.concurrent.ListenableFuture<V>, com.google.common.base.Function<java.lang.Exception, X>);
public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateFuture(V);
public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> immediateCheckedFuture(V);
public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateFailedFuture(java.lang.Throwable);
public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateCancelledFuture();
public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> immediateFailedCheckedFuture(X);
public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> withFallback(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>, org.spark-project.guava.util.concurrent.FutureFallback<? extends V>);
public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> withFallback(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>, org.spark-project.guava.util.concurrent.FutureFallback<? extends V>, java.util.concurrent.Executor);
public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, org.spark-project.guava.util.concurrent.AsyncFunction<? super I, ? extends O>);
public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, org.spark-project.guava.util.concurrent.AsyncFunction<? super I, ? extends O>, java.util.concurrent.Executor);
public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, com.google.common.base.Function<? super I, ? extends O>);
public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, com.google.common.base.Function<? super I, ? extends O>, java.util.concurrent.Executor);
public static <I, O> java.util.concurrent.Future<O> lazyTransform(java.util.concurrent.Future<I>, com.google.common.base.Function<? super I, ? extends O>);
public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> dereference(org.spark-project.guava.util.concurrent.ListenableFuture<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>);
public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> allAsList(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>...);
public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> allAsList(java.lang.Iterable<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>);
public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> successfulAsList(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>...);
public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> successfulAsList(java.lang.Iterable<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>);
public static <V> void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture<V>, org.spark-project.guava.util.concurrent.FutureCallback<? super V>);
public static <V> void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture<V>, org.spark-project.guava.util.concurrent.FutureCallback<? super V>, java.util.concurrent.Executor);
public static <V, X extends java.lang.Exception> V get(java.util.concurrent.Future<V>, java.lang.Class<X>) throws X;
public static <V, X extends java.lang.Exception> V get(java.util.concurrent.Future<V>, long, java.util.concurrent.TimeUnit, java.lang.Class<X>) throws X;
public static <V> V getUnchecked(java.util.concurrent.Future<V>);
static {};
}
You can follow the instructions here https://arjon.es/2015/making-hadoop-2.6-spark-cassandra-driver-play-nice-together/ to also do shading yourself during compilation. With spark-shell you may be able to get away with some changes in spark.driver.extraClassPath
as mentioned here, though collisions may then continue to arise at various points.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With