I have had issues with spark-cassandra-connector (1.0.4, 1.1.0) when writing batches of 9 millions rows to a 12 nodes cassandra (2.1.2) cluster. I was writing with consistency ALL and reading with consistency ONE but the number of rows read was every time different from 9 million (8.865.753, 8.753.213 etc.).
I've checked the code of the connector and found no issues. Then, I decided to write my own application, independent from spark and the connector, to investigate the problem (the only dependency is datastax-driver-code version 2.1.3).
The full code, the startup scripts and the configuration files can now be found on github.
In pseudo-code, I wrote two different version of the application, the sync one:
try (Session session = cluster.connect()) {
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
session.execute(bound);
}
}
And the async one:
try (Session session = cluster.connect()) {
List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
while(futures.size()>=10 /* Max 10 concurrent writes */) {
// Wait for the first issued write to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
futures.add(session.executeAsync(bound));
}
while(futures.size()>0) {
// Wait for the other write requests to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
}
The last one is similar to that used by the connector in the case of no-batch configuration.
The two versions of the application work the same in all circumstances, except when the load is high.
For instance, when running the sync version with 5 threads on 9 machines (45 threads) writing 9 millions rows to the cluster, I find all the rows in the subsequent read (with spark-cassandra-connector).
If I run the async version with 1 thread per machine (9 threads), the execution is much faster but I cannot find all the rows in the subsequent read (the same problem that arised with the spark-cassandra-connector).
No exception was thrown by the code during the executions.
What could be the cause of the issue ?
I add some other results (thanks for the comments):
Issues seemed start arising with Async writes and a number of concurrent writers > 45 and <=90, so I did other tests to ensure that the finding were right:
The last finding shows that the high number of concurrent writers (90) is not an issue as was expected in the first tests. The problem is the high number of async writes using the same session.
With 5 concurrent async writes on the same session the issue is not present. If I increase to 10 the number of concurrent writes, some operations get lost without notification.
It seems that the async writes are broken in Cassandra 2.1.2 (or the Cassandra Java driver) if you issue multiple (>5) writes concurrently on the same session.
Nicola and I communicated over email this weekend and thought I'd provide an update here with my current theory. I took a look at the github project Nicola shared and experimented with an 8 node cluster on EC2.
I was able to reproduce the issue with 2.1.2, but did observe that after a period of time I could re-execute the spark job and all 9 million rows were returned.
What I seemed to notice was that while nodes were under compaction I did not get all 9 million rows. On a whim I took a look at the change log for 2.1 and observed an issue CASSANDRA-8429 - "Some keys unreadable during compaction" that may explain this problem.
Seeing that the issue has been fixed at is targeted for 2.1.3, I reran the test against the cassandra-2.1 branch and ran the count job while compaction activity was happening and got 9 million rows back.
I'd like to experiment with this some more since my testing with the cassandra-2.1 branch was rather limited and the compaction activity may have been purely coincidental, but I'm hoping this may explain these issues.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With