Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark with Cassandra input/output

Picture the following senario: A Spark application (Java implementation) is using Cassandra database to load, convert to RDD and process the data. Also the application is steaming new data from the database which are also processed by a custom receiver. The output of the streaming process is stored in the database. The implementation is using Spring Data Cassandra from the integration with the database.

CassandraConfig:

@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {

    @Autowired
    private Environment env;

    @Bean
    public CassandraClusterFactoryBean cluster() {
        CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
        cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
        cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));

        return cluster;
    }

    @Bean
    public CassandraMappingContext mappingContext() {
        return new BasicCassandraMappingContext();
    }

    @Bean
    public CassandraConverter converter() {
        return new MappingCassandraConverter(mappingContext());
    }

    @Bean
    public CassandraSessionFactoryBean session() throws Exception {
        CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
        session.setConverter(converter());
        session.setSchemaAction(SchemaAction.NONE);

        return session;
    }

    @Bean
    public CassandraOperations cassandraTemplate() throws Exception {
        return new CassandraTemplate(session().getObject());
    }

}

DataProcessor.main method:

// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);

while(pagingResults != null && !pagingResults.isEmpty()) {
    Event lastEvent = pagingResults.get(pagingResults.size() - 1);
    pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
    // Parallelize page and add to the existing
    rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}

// data processing
...

It is expected to have a big amount of data for the initial loading. For this reason the data are paginated, loaded and distributed in rddBuffer.

There are also the following options available:

  1. The Spark-Cassandra example (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala), although there is minimum amount of documentation for this example.
  2. The Calliope project (http://tuplejump.github.io/calliope/)

I would like to know what is the best practice for the integration of Spark with Cassandra. What would be the best option to follow in my implementation?

Apache Spark 1.0.0, Apache Cassandra 2.0.8

like image 842
Pantelis Papapoulias Avatar asked Jun 27 '14 11:06

Pantelis Papapoulias


People also ask

Can Spark be used with Cassandra?

To connect Spark to a Cassandra cluster, the Cassandra Connector will need to be added to the Spark project. DataStax provides their own Cassandra Connector on GitHub and we will use that. This should output compiled jar files to the directory named “target”. There will be two jar files, one for Scala and one for Java.

How does Spark work with Cassandra?

How does it work? The fundamental idea is quite simple: Spark and Cassandra clusters are deployed to the same set of machines. Cassandra stores the data; Spark worker nodes are co-located with Cassandra and do the data processing. Spark is a batch-processing system, designed to deal with large amounts of data.

How do you load data from Spark to Cassandra?

There are 2 methods we can use to load data into Spark from Cassandra and do transformations on. The first method would be the load() method, and the second method would be using a catalog.

Can Cassandra and Spark run on the same cluster?

Regardless where you run your workloads, you have two approaches that you can use to integrate Spark and Cassandra. You can have a cluster for each tool or runt them in the same cluster which is the main focus of this article.


2 Answers

The easiest way to work with Cassandra and Spark is to use the official open source Cassandra driver for Spark developed by DataStax: https://github.com/datastax/spark-cassandra-connector

This driver has been built on top of Cassandra Java Driver and provides a direct bridge between Cassandra and Spark. Unlike Calliope, it does not use the Hadoop interface. Additionally it offers the following unique features:

  • support for all Cassandra data types, including collections, out of the box
  • lightweight mapping of Cassandra rows to custom classes or tuples without the need to use any implicits or other advanced features in Scala
  • saving any RDDs to Cassandra
  • full support for Cassandra Virtual Nodes
  • ability to filter / select on the server side, e.g. leveraging Cassandra clustering columns or secondary indexes
like image 107
Piotr Kołaczkowski Avatar answered Oct 17 '22 08:10

Piotr Kołaczkowski


The approach in the code above is a classical centralized algorithm that would work only if executed in one node. Both Cassandra and Spark are distributed systems and therefore it's necessary to model the process in such a way that it can be distributed among a number of nodes.

There are few approaches possible: If you know the keys of the rows to fetch, you could do something simple like this: (using the DataStax Java Driver)

val data = sparkContext.parallelize(keys).map{key => 
   val cluster = val cluster = Cluster.builder.addContactPoint(host).build()
   val session  = cluster.connect(keyspace)
   val statement = session.prepare("...cql...);")
   val boundStatement = new BoundStatement(sttmt)
   session.execute(session.execute(boundStatement.bind(...data...)
}

This will effectively distribute the fetching of keys across the Spark Cluster. Note how the connection to C* is done within the closure as this ensures that the connection is established when the task is executed on each separate, distributed worker.

Given that your example uses a wildcard (i.e. the keys are not known), using the Hadoop interface of Cassandra is a good option. The Spark-Cassandra example linked in the question illustrates the use of this Hadoop interface on Cassandra.

Calliope is a library that encapsulates the complexity of using the Hadoop interface, by providing a simple API to access that functionality. It's available only in Scala, as it uses specific Scala features (like implicits and macros in the upcoming release) With Calliope, you basically declare how to convert your RDD[type] into a row key and row value, and Calliope takes care of configuring the hadoop interfaces to to the job. We have found that Calliope (and the underlying hadoop interfaces) are 2-4x faster than using a driver to interact with Cassandra.

Conclusion: I'd walk away from the Spring-Data configuration to access Cassandra, as this will limit you to a single node. Consider a simple parallelized access if possible or explore using Calliope in Scala.

like image 33
maasg Avatar answered Oct 17 '22 07:10

maasg