Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Guava version while using spark-shell

I'm trying to use the spark-cassandra-connector via spark-shell on dataproc, however I am unable to connect to my cluster. It appears that there is a version mismatch since the classpath is including a much older guava version from somewhere else, even when I specify the proper version on startup. I suspect this is likely caused by all the Hadoop dependencies put into the classpath by default.

Is there anyway to have spark-shell use only the proper version of guava, without getting rid of all the Hadoop-related dataproc included jars?

Relevant Data:

Starting spark-shell, showing it having the proper version of Guava: $ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3

:: loading settings :: url = jar:file:/usr/lib/spark/lib/spark-assembly-1.5.2-hadoop2.7.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.datastax.spark#spark-cassandra-connector_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
        confs: [default]
        found com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 in central
        found org.apache.cassandra#cassandra-clientutil;2.2.2 in central
        found com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 in central
        found io.netty#netty-handler;4.0.27.Final in central
        found io.netty#netty-buffer;4.0.27.Final in central
        found io.netty#netty-common;4.0.27.Final in central
        found io.netty#netty-transport;4.0.27.Final in central
        found io.netty#netty-codec;4.0.27.Final in central
        found com.codahale.metrics#metrics-core;3.0.2 in central
        found org.slf4j#slf4j-api;1.7.5 in central
        found org.apache.commons#commons-lang3;3.3.2 in central
        found com.google.guava#guava;16.0.1 in central
        found org.joda#joda-convert;1.2 in central
        found joda-time#joda-time;2.3 in central
        found com.twitter#jsr166e;1.1.0 in central
        found org.scala-lang#scala-reflect;2.10.5 in central
:: resolution report :: resolve 502ms :: artifacts dl 10ms
        :: modules in use:
        com.codahale.metrics#metrics-core;3.0.2 from central in [default]
        com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 from central in [default]
        com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 from central in [default]
        com.google.guava#guava;16.0.1 from central in [default]
        com.twitter#jsr166e;1.1.0 from central in [default]
        io.netty#netty-buffer;4.0.27.Final from central in [default]
        io.netty#netty-codec;4.0.27.Final from central in [default]
        io.netty#netty-common;4.0.27.Final from central in [default]
        io.netty#netty-handler;4.0.27.Final from central in [default]
        io.netty#netty-transport;4.0.27.Final from central in [default]
        joda-time#joda-time;2.3 from central in [default]
        org.apache.cassandra#cassandra-clientutil;2.2.2 from central in [default]
        org.apache.commons#commons-lang3;3.3.2 from central in [default]
        org.joda#joda-convert;1.2 from central in [default]
        org.scala-lang#scala-reflect;2.10.5 from central in [default]
        org.slf4j#slf4j-api;1.7.5 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   16  |   0   |   0   |   0   ||   16  |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
        confs: [default]
        0 artifacts copied, 16 already retrieved (0kB/12ms)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.2
      /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.8.0_66-internal)
Type in expressions to have them evaluated.
Type :help for more information.
15/12/10 17:38:46 WARN org.apache.spark.metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
Spark context available as sc.

ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
15/12/10 17:38:54 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/12/10 17:38:54 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
SQL context available as sqlContext.

Stack Trace when doing initial connection:

java.io.IOException: Failed to open native connection to Cassandra at {10.240.0.7}:9042
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:249)
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:51)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:146)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1125)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
    at $iwC$$iwC$$iwC.<init>(<console>:53)
    at $iwC$$iwC.<init>(<console>:55)
    at $iwC.<init>(<console>:57)
    at <init>(<console>:59)
    at .<init>(<console>:63)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$pasteCommand(SparkILoop.scala:825)
    at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345)
    at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345)
    at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
    at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
    at scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:809)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback(Lcom/google/common/util/concurrent/ListenableFuture;Lcom/google/common/util/concurrent/FutureFallback;Ljava/util/concurrent/Executor;)Lcom/google/common/util/concurrent/Listenab
leFuture;
        at com.datastax.driver.core.Connection.initAsync(Connection.java:178)
        at com.datastax.driver.core.Connection$Factory.open(Connection.java:742)
        at com.datastax.driver.core.ControlConnection.tryConnect(ControlConnection.java:240)
        at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:187)
        at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
        at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1393)
        at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:402)
        at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:155)
        ... 70 more
like image 969
SikhNerd Avatar asked Dec 10 '15 18:12

SikhNerd


1 Answers

Unfortunately, Hadoop's dependency on Guava 11 (which doesn't have the Futures.withFallback method mentioned) is a longstanding issue and indeed Hadoop 2.7.1 still depends on Guava 11.

Spark core uses Guava 14, as can be seen here but this is worked around by shading Guava inside the Spark assembly:

$ jar tf /usr/lib/spark/lib/spark-assembly.jar | grep concurrent.Futures
org/spark-project/guava/util/concurrent/Futures$1.class
org/spark-project/guava/util/concurrent/Futures$2.class
org/spark-project/guava/util/concurrent/Futures$3.class
org/spark-project/guava/util/concurrent/Futures$4.class
org/spark-project/guava/util/concurrent/Futures$5.class
org/spark-project/guava/util/concurrent/Futures$6.class
org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture$1.class
org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture.class
org/spark-project/guava/util/concurrent/Futures$CombinedFuture$1.class
org/spark-project/guava/util/concurrent/Futures$CombinedFuture$2.class
org/spark-project/guava/util/concurrent/Futures$CombinedFuture.class
org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1$1.class
org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1.class
org/spark-project/guava/util/concurrent/Futures$FallbackFuture.class
org/spark-project/guava/util/concurrent/Futures$FutureCombiner.class
org/spark-project/guava/util/concurrent/Futures$ImmediateCancelledFuture.class
org/spark-project/guava/util/concurrent/Futures$ImmediateFailedCheckedFuture.class
org/spark-project/guava/util/concurrent/Futures$ImmediateFailedFuture.class
org/spark-project/guava/util/concurrent/Futures$ImmediateFuture.class
org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulCheckedFuture.class
org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulFuture.class
org/spark-project/guava/util/concurrent/Futures$MappingCheckedFuture.class
org/spark-project/guava/util/concurrent/Futures.class

$ javap -cp /usr/lib/spark/lib/spark-assembly.jar org.spark-project.guava.util.concurrent.Futures
Compiled from "Futures.java"
public final class org.spark-project.guava.util.concurrent.Futures {
  public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> makeChecked(org.spark-project.guava.util.concurrent.ListenableFuture<V>, com.google.common.base.Function<java.lang.Exception, X>);
  public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateFuture(V);
  public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> immediateCheckedFuture(V);
  public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateFailedFuture(java.lang.Throwable);
  public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateCancelledFuture();
  public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> immediateFailedCheckedFuture(X);
  public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> withFallback(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>, org.spark-project.guava.util.concurrent.FutureFallback<? extends V>);
  public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> withFallback(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>, org.spark-project.guava.util.concurrent.FutureFallback<? extends V>, java.util.concurrent.Executor);
  public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, org.spark-project.guava.util.concurrent.AsyncFunction<? super I, ? extends O>);
  public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, org.spark-project.guava.util.concurrent.AsyncFunction<? super I, ? extends O>, java.util.concurrent.Executor);
  public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, com.google.common.base.Function<? super I, ? extends O>);
  public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, com.google.common.base.Function<? super I, ? extends O>, java.util.concurrent.Executor);
  public static <I, O> java.util.concurrent.Future<O> lazyTransform(java.util.concurrent.Future<I>, com.google.common.base.Function<? super I, ? extends O>);
  public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> dereference(org.spark-project.guava.util.concurrent.ListenableFuture<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>);
  public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> allAsList(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>...);
  public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> allAsList(java.lang.Iterable<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>);
  public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> successfulAsList(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>...);
  public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> successfulAsList(java.lang.Iterable<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>);
  public static <V> void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture<V>, org.spark-project.guava.util.concurrent.FutureCallback<? super V>);
  public static <V> void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture<V>, org.spark-project.guava.util.concurrent.FutureCallback<? super V>, java.util.concurrent.Executor);
  public static <V, X extends java.lang.Exception> V get(java.util.concurrent.Future<V>, java.lang.Class<X>) throws X;
  public static <V, X extends java.lang.Exception> V get(java.util.concurrent.Future<V>, long, java.util.concurrent.TimeUnit, java.lang.Class<X>) throws X;
  public static <V> V getUnchecked(java.util.concurrent.Future<V>);
  static {};
}

You can follow the instructions here https://arjon.es/2015/making-hadoop-2.6-spark-cassandra-driver-play-nice-together/ to also do shading yourself during compilation. With spark-shell you may be able to get away with some changes in spark.driver.extraClassPath as mentioned here, though collisions may then continue to arise at various points.

like image 52
Dennis Huo Avatar answered Sep 22 '22 00:09

Dennis Huo