I'm using Scala Version 2.10.5 Cassandra 3.0 and Spark 1.6. I want to insert data into cassandra so I tried Out basic Example
scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
Which Works and able insert data into Cassandra.So I had a csv file Which I wan to insert into Cassandra table by matching schema
val person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
import org.apache.spark.sql._
val schema = StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
personSchemaRDD.saveToCassandra
When I am using SaveToCassndra Iam getting saveToCassandra is not part of personSchemaRDD. So taught of trying in different way
df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()
But getting the cannot connect to cassandra on ip:port.can any one tell me the best way to do it. I need to periodically save data to cassandra from the files.
There are 2 methods we can use to load data into Spark from Cassandra and do transformations on. The first method would be the load() method, and the second method would be using a catalog.
To connect Spark to a Cassandra cluster, the Cassandra Connector will need to be added to the Spark project. DataStax provides their own Cassandra Connector on GitHub and we will use that. This should output compiled jar files to the directory named “target”. There will be two jar files, one for Scala and one for Java.
How does it work? The fundamental idea is quite simple: Spark and Cassandra clusters are deployed to the same set of machines. Cassandra stores the data; Spark worker nodes are co-located with Cassandra and do the data processing. Spark is a batch-processing system, designed to deal with large amounts of data.
Spark is the world's foremost distributed analytics platform, delivering in-memory analytics with a speed and ease of use unheard of in Hadoop. Cassandra is the lighting fast distributed database powering such IT giants as Outbrain and Netflix.
sqlContext.applySchema(...)
returns a DataFrame
and a DataFrame
does not have the saveToCassandra
method.
You could the .write
method with it:
val personDF = sqlContext.applySchema(rowRDD, schema)
personDF.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()
If we want to use the savetoCassandra
method, the best way is to have a schema-aware RDD, using a case class.
case class Person(firstname:String, lastName:String, age:Int)
val rowRDD = person.map(_.split(",")).map(p => Person(p(0),p(1),p(2).toInt)
rowRDD.saveToCassandra(keyspace, table)
The Dataframe write
method should work. Check that you have configured your context correctly.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With