Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Connecting/Integrating Cassandra with Spark (pyspark)

I'm desperately trying to connect Cassandra to pyspark but I cannot get it to work. I'm quite new to spark and cassandra, so I might miss something rather simple.

I'm a bit confused by all the different explanations online, however from what I understood, the easiest way would be to use "Spark packages"? (http://spark-packages.org/package/TargetHolding/pyspark-cassandra)

So, with the following command:

./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.11:1.5.0-M2 ../Main/Code/myPysparkFile.py

Am I right in my understand that I do not need to download any packages if I use spark packages as described above?

in the myPysparkFile.py I tried the following two version, neither of which I working for me:

Version 1, which I got from page 14 in http://www.slideshare.net/JonHaddad/intro-to-py-spark-and-cassandra:

"SparkCassandraTest.py"
from pyspark import SparkContext, SparkConf
from pyspark_cassandra import CassandraSparkContext,Row

conf = SparkConf()
conf.setMaster("local[4]")
conf.setAppName("Spark Cassandra")
conf.set("spark.cassandra.connection.host","http://127.0.0.1")

sc = CassandraSparkContext(conf=conf)

rdd = sc.cassandraTable("test", "words")

As an Error i get:

ImportError: No module named pyspark_cassandra

Version 2 (which is inspired from: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md):

"SparkCassandraTest.py"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

conf = SparkConf()
conf.setMaster("local[4]")
conf.setAppName("Spark Cassandra")
conf.set("spark.cassandra.connection.host","http://127.0.0.1")

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

sqlContext.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="kv", keyspace="test")\
    .load().show()

which gives me the following error:

    py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
: java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
    at org.apache.spark.sql.cassandra.DefaultSource$.<init>(DefaultSource.scala:138)
    at org.apache.spark.sql.cassandra.DefaultSource$.<clinit>(DefaultSource.scala)
    at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

I really don't know what I'm doing wrong and would appreciate any help. Also, what is the difference between using version 1 or version 2? Are there any advantages or disadvantages between the two versions?

Also, any further references on how to best integrate and use spark with cassandra would be highly appreciated.

Btw, Cassandra is running on my pc with the basic configurations on port 7000.

Thanks.

like image 788
Kito Avatar asked Oct 29 '15 14:10

Kito


1 Answers

Pyspark_Cassandra is a different package than the spark-cassandra-connector. It includes a version of the SCC but is not interchangeable. Installing the SCC does not install pyspark_cassandra. This package is required if you want to use sc.cassandraTable() from pyspark.

Installing SCC does give you the ability to use Dataframes in pyspark which is the most efficient way to deal with C* from pyspark. This would be the same as your V2 example. It failing makes it seem like you did not launch V2 using the --package command.

The reason it may be failing is that you are specifying the Scala 2.11 version of the library here

./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.11:1.5.0-M2 ../Main/Code/myPysparkFile.py

And are most likely not running a Scala 2.10 version of Spark (the default download is 2.10)

like image 142
RussS Avatar answered Nov 10 '22 11:11

RussS