Picture the following senario: A Spark application (Java implementation) is using Cassandra database to load, convert to RDD and process the data. Also the application is steaming new data from the database which are also processed by a custom receiver. The output of the streaming process is stored in the database. The implementation is using Spring Data Cassandra from the integration with the database.
CassandraConfig:
@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {
@Autowired
private Environment env;
@Bean
public CassandraClusterFactoryBean cluster() {
CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));
return cluster;
}
@Bean
public CassandraMappingContext mappingContext() {
return new BasicCassandraMappingContext();
}
@Bean
public CassandraConverter converter() {
return new MappingCassandraConverter(mappingContext());
}
@Bean
public CassandraSessionFactoryBean session() throws Exception {
CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
session.setCluster(cluster().getObject());
session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
session.setConverter(converter());
session.setSchemaAction(SchemaAction.NONE);
return session;
}
@Bean
public CassandraOperations cassandraTemplate() throws Exception {
return new CassandraTemplate(session().getObject());
}
}
DataProcessor.main method:
// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);
while(pagingResults != null && !pagingResults.isEmpty()) {
Event lastEvent = pagingResults.get(pagingResults.size() - 1);
pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize page and add to the existing
rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}
// data processing
...
It is expected to have a big amount of data for the initial loading. For this reason the data are paginated, loaded and distributed in rddBuffer.
There are also the following options available:
I would like to know what is the best practice for the integration of Spark with Cassandra. What would be the best option to follow in my implementation?
Apache Spark 1.0.0, Apache Cassandra 2.0.8
To connect Spark to a Cassandra cluster, the Cassandra Connector will need to be added to the Spark project. DataStax provides their own Cassandra Connector on GitHub and we will use that. This should output compiled jar files to the directory named “target”. There will be two jar files, one for Scala and one for Java.
How does it work? The fundamental idea is quite simple: Spark and Cassandra clusters are deployed to the same set of machines. Cassandra stores the data; Spark worker nodes are co-located with Cassandra and do the data processing. Spark is a batch-processing system, designed to deal with large amounts of data.
There are 2 methods we can use to load data into Spark from Cassandra and do transformations on. The first method would be the load() method, and the second method would be using a catalog.
Regardless where you run your workloads, you have two approaches that you can use to integrate Spark and Cassandra. You can have a cluster for each tool or runt them in the same cluster which is the main focus of this article.
The easiest way to work with Cassandra and Spark is to use the official open source Cassandra driver for Spark developed by DataStax: https://github.com/datastax/spark-cassandra-connector
This driver has been built on top of Cassandra Java Driver and provides a direct bridge between Cassandra and Spark. Unlike Calliope, it does not use the Hadoop interface. Additionally it offers the following unique features:
The approach in the code above is a classical centralized algorithm that would work only if executed in one node. Both Cassandra and Spark are distributed systems and therefore it's necessary to model the process in such a way that it can be distributed among a number of nodes.
There are few approaches possible: If you know the keys of the rows to fetch, you could do something simple like this: (using the DataStax Java Driver)
val data = sparkContext.parallelize(keys).map{key =>
val cluster = val cluster = Cluster.builder.addContactPoint(host).build()
val session = cluster.connect(keyspace)
val statement = session.prepare("...cql...);")
val boundStatement = new BoundStatement(sttmt)
session.execute(session.execute(boundStatement.bind(...data...)
}
This will effectively distribute the fetching of keys across the Spark Cluster. Note how the connection to C* is done within the closure as this ensures that the connection is established when the task is executed on each separate, distributed worker.
Given that your example uses a wildcard (i.e. the keys are not known), using the Hadoop interface of Cassandra is a good option. The Spark-Cassandra example linked in the question illustrates the use of this Hadoop interface on Cassandra.
Calliope is a library that encapsulates the complexity of using the Hadoop interface, by providing a simple API to access that functionality. It's available only in Scala, as it uses specific Scala features (like implicits and macros in the upcoming release) With Calliope, you basically declare how to convert your RDD[type] into a row key and row value, and Calliope takes care of configuring the hadoop interfaces to to the job. We have found that Calliope (and the underlying hadoop interfaces) are 2-4x faster than using a driver to interact with Cassandra.
Conclusion: I'd walk away from the Spring-Data configuration to access Cassandra, as this will limit you to a single node. Consider a simple parallelized access if possible or explore using Calliope in Scala.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With