Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Inserting Data Into Cassandra table Using Spark DataFrame

I'm using Scala Version 2.10.5 Cassandra 3.0 and Spark 1.6. I want to insert data into cassandra so I tried Out basic Example

scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

Which Works and able insert data into Cassandra.So I had a csv file Which I wan to insert into Cassandra table by matching schema

val person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
import org.apache.spark.sql._
val schema =  StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
 personSchemaRDD.saveToCassandra

When I am using SaveToCassndra Iam getting saveToCassandra is not part of personSchemaRDD. So taught of trying in different way

 df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()

But getting the cannot connect to cassandra on ip:port.can any one tell me the best way to do it. I need to periodically save data to cassandra from the files.

like image 322
Anji Avatar asked Dec 20 '16 17:12

Anji


People also ask

How do you load data from Spark to Cassandra?

There are 2 methods we can use to load data into Spark from Cassandra and do transformations on. The first method would be the load() method, and the second method would be using a catalog.

Can Spark be used with Cassandra?

To connect Spark to a Cassandra cluster, the Cassandra Connector will need to be added to the Spark project. DataStax provides their own Cassandra Connector on GitHub and we will use that. This should output compiled jar files to the directory named “target”. There will be two jar files, one for Scala and one for Java.

How does Spark work with Cassandra?

How does it work? The fundamental idea is quite simple: Spark and Cassandra clusters are deployed to the same set of machines. Cassandra stores the data; Spark worker nodes are co-located with Cassandra and do the data processing. Spark is a batch-processing system, designed to deal with large amounts of data.

What is Spark in Cassandra?

Spark is the world's foremost distributed analytics platform, delivering in-memory analytics with a speed and ease of use unheard of in Hadoop. Cassandra is the lighting fast distributed database powering such IT giants as Outbrain and Netflix.


1 Answers

sqlContext.applySchema(...) returns a DataFrame and a DataFrame does not have the saveToCassandra method.

You could the .write method with it:

val personDF = sqlContext.applySchema(rowRDD, schema)
personDF.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()

If we want to use the savetoCassandra method, the best way is to have a schema-aware RDD, using a case class.

case class Person(firstname:String, lastName:String, age:Int)
val rowRDD = person.map(_.split(",")).map(p => Person(p(0),p(1),p(2).toInt)
rowRDD.saveToCassandra(keyspace, table)

The Dataframe write method should work. Check that you have configured your context correctly.

like image 69
maasg Avatar answered Sep 20 '22 23:09

maasg