Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cassandra: List 10 most recently modified records

Tags:

cassandra

cql

I'm having trouble trying to model my data such that I can efficiently query Cassandra for the last 10 (any number actually) records that were most recently modified. Each record has a last_modified_date column that is set by the application when inserting/updating the record.

I've excluded the data columns from this example code.

Main data table (contains only one row per record):

CREATE TABLE record (
    record_id int,
    last_modified_by text,
    last_modified_date timestamp,
    PRIMARY KEY (record_id)
);

Solution 1 (Fail)

I tried to create a separate table, which used a clustering key order.

Table (one row for each record; only inserting the last modified date):

CREATE TABLE record_by_last_modified_index (
    record_id int,
    last_modified_by text,
    last_modified_date timestamp,
    PRIMARY KEY (record_id, last_modified_date)
) WITH CLUSTERING ORDER BY (last_modified_date DESC);

Query:

SELECT * FROM record_by_last_modified_index LIMIT 10

This solution does not work since the clustering order only applies to the ordering of records with the same partition key. Since each row has a different partition key (record_id) the query results don't include the expected records.

Solution 2 (Inefficient)

Another solution I have tried is to simply query Cassandra for all record_id and last_modified_date values, sort them and pick the first 10 records in my application. This is clearly inefficient and won't scale well.

Solution 3

One last solution, which I considered is using the same partition key for all records and using clustering order to ensure records are sorted correctly. The problem with that solution is that the data will not be correctly partitioned across the nodes since all of the records would have the same partition key. That seems like a non-starter to me.

like image 689
Shaun Avatar asked Aug 14 '15 16:08

Shaun


People also ask

What is Cassandra list?

A list has a form much like a set, in that a list groups and stores values. Unlike a set, the values stored in a list do not need to be unique and can be duplicated. In addition, a list stores the elements in a particular order and may be inserted or retrieved according to an index value.

Does Cassandra have TTL?

Cassandra upserts the column with the new TTL. To remove TTL from a column, set TTL to zero. For details, see the UPDATE documentation.

What happens when TTL expires in Cassandra?

Whenever a TTL is expired in Cassandra for a column it checks for all the non primary column values in the record, if all values are null, record gets deleted. Even after TTL is expired in Cassandra and at later point all the non primary column value turns null, the record gets deleted.

What is the use of allow filtering in Cassandra?

Cassandra will request ALLOW FILTERING as it will have to first find and load the rows containing Jonathan as author, and then to filter out the ones which do not have a time2 column equal to the specified value. Adding an index on time2 might improve the query performance.


3 Answers

I think what you're trying to do is more of a relational database model and is somewhat of an anti-pattern in Cassandra.

Cassandra only sorts things based on clustering columns, but the sort order isn't expected to change. This is because when memtables are written to disk as SSTables (Sorted String Tables), the SSTables are immutable and can't be re-sorted efficiently. This is why you aren't allowed to update the value of a clustering column.

If you want to re-sort the clustered rows, the only way I know is to delete the old row and insert a new one in a batch. To make that even more inefficient, you would probably need to first do a read to figure out what the last_modified_date was for the record_id so that you could delete it.

So I'd look for a different approach, such as just writing the updates as new clustered rows and leave the old ones there (possibly clean them up over time using a TTL). So your newest updates would always be on top when you did a LIMIT query.

In terms of partitioning, you will need to break your data into a few categories to spread the data over your nodes. That means you won't get global sorting of your table, but only within categories, which is due to the distributed model. If you really need global sorting, then perhaps look at something like pairing Cassandra with Spark. Sorting is super expensive in time and resources, so think carefully if you really need it.

Update:

Thinking about this some more, you should be able to do this in Cassandra 3.0 using materialized views. The view would take care of the messy delete and insert for you, to re-order the clustered rows. So here's what it looks like in the 3.0 alpha release:

First create the base table:

CREATE TABLE record_ids (
    record_type int,
    last_modified_date timestamp,
    record_id int,
    PRIMARY KEY(record_type, record_id));

Then create a view of that table, using last_modified_date as a clustering column:

CREATE MATERIALIZED VIEW last_modified AS
    SELECT record_type FROM record_ids
    WHERE record_type IS NOT NULL AND last_modified_date IS NOT NULL AND record_id IS NOT NULL
    PRIMARY KEY (record_type, last_modified_date, record_id)
    WITH CLUSTERING ORDER BY (last_modified_date DESC);

Now insert some records:

insert into record_ids (record_type, last_modified_date, record_id) VALUES ( 1, dateof(now()), 100);
insert into record_ids (record_type, last_modified_date, record_id) VALUES ( 1, dateof(now()), 200);
insert into record_ids (record_type, last_modified_date, record_id) VALUES ( 1, dateof(now()), 300);

SELECT * FROM record_ids;

 record_type | record_id | last_modified_date
-------------+-----------+--------------------------
           1 |       100 | 2015-08-14 19:41:10+0000
           1 |       200 | 2015-08-14 19:41:25+0000
           1 |       300 | 2015-08-14 19:41:41+0000

SELECT * FROM last_modified;

 record_type | last_modified_date       | record_id
-------------+--------------------------+-----------
           1 | 2015-08-14 19:41:41+0000 |       300
           1 | 2015-08-14 19:41:25+0000 |       200
           1 | 2015-08-14 19:41:10+0000 |       100

Now we update a record in the base table, and should see it move to the top of the list in the view:

UPDATE record_ids SET last_modified_date = dateof(now()) 
WHERE record_type=1 AND record_id=200;

So in the base table, we see the timestamp for record_id=200 was updated:

SELECT * FROM record_ids;

 record_type | record_id | last_modified_date
-------------+-----------+--------------------------
           1 |       100 | 2015-08-14 19:41:10+0000
           1 |       200 | 2015-08-14 19:43:13+0000
           1 |       300 | 2015-08-14 19:41:41+0000

And in the view, we see:

 SELECT * FROM last_modified;

 record_type | last_modified_date       | record_id
-------------+--------------------------+-----------
           1 | 2015-08-14 19:43:13+0000 |       200
           1 | 2015-08-14 19:41:41+0000 |       300
           1 | 2015-08-14 19:41:10+0000 |       100

So you see that record_id=200 moved up in the view and if you do a limit N on that table, you'd get the N most recently modified rows.

like image 65
Jim Meyer Avatar answered Nov 02 '22 06:11

Jim Meyer


The only way to CQL query an entire table/view sorted by a field is to make the partition key constant. Exactly one machine (times replication factor) will hold the entire table. E.g. with a partition INT partition key that is always zero and the clustering key as the field that needs sorting. You should observe read/write/capacity performance similar to a single-node database with an index on the sorted field even if you have more nodes in your cluster. This doesn't entirely defeat the purpose of Cassandra because it can help scale in the future.

If performance is insufficient then you can decide to scale by increasing the partition variety. E.g. randomly choosing from 0, 1, 2, 3 for inserts will up to quadruple read/write/capacity perf when 4 nodes are used. Then to find the "10 most recent" items you'll have to manually query all 4 partitions and merge-sort the results.

In theory Cassandra could supply this facility of a dynamic node-count-max-modulo partition key for INSERT and a merge-sort for SELECT (with ALLOW FILTERING).

Cassandra's Design Goals Disallow Global Sort

To allow write, read and storage capacity to scale linearly with node count Cassandra requires:

  • Every insert land on a single node.
  • Every select land on a single node.
  • Clients distribute workload similarly between all nodes.

If I understand correctly, the consequence is a full-table single-field-sorted query will always require reading from the entire cluster and merge sorting.

Note materialized views are equivalent to tables, they don't have any magical property that makes them better at global-sort. See http://www.datastax.com/dev/blog/we-shall-have-order where Aaron Ploetz agrees that cassandra and cql cannot sort on one field without a partition and scale.

Example Solution

CREATE KEYSPACE IF NOT EXISTS
    tmpsort
WITH REPLICATION =
    {'class':'SimpleStrategy', 'replication_factor' : 1};

USE tmpsort;

CREATE TABLE record_ids (
    partition int,
    last_modified_date timestamp,
    record_id int,
    PRIMARY KEY((partition), last_modified_date, record_id))
    WITH CLUSTERING ORDER BY (last_modified_date DESC);

INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 1, DATEOF(NOW()), 100);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 2, DATEOF(NOW()), 101);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 3, DATEOF(NOW()), 102);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 1, DATEOF(NOW()), 103);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 2, DATEOF(NOW()), 104);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 3, DATEOF(NOW()), 105);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 3, DATEOF(NOW()), 106);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 3, DATEOF(NOW()), 107);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 2, DATEOF(NOW()), 108);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 3, DATEOF(NOW()), 109);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 1, DATEOF(NOW()), 110);
INSERT INTO record_ids (partition, last_modified_date, record_id) VALUES ( 1, DATEOF(NOW()), 111);

SELECT * FROM record_ids;

-- Note the results are only sorted in their partition
-- To try again:
-- DROP KEYSPACE tmpsort;

Note that without a WHERE clause you get results in token(partition-key) order. See https://dba.stackexchange.com/questions/157537/querying-cassandra-without-a-partition-key

Other database distribution models

If I understood correctly - CockroachDB would similarly bottle-neck read/write performance on monotonic incrementing data to one node at any given time but storage capacity would scale linearly. Also other range queries like "oldest 10" or "between date X and date Y" would distribute load on more nodes as opposed to Cassandra. This is because CockroachDB's database is one giant sorted key-value store where whenever a range of sorted data reaches a certain size it is redistributed.

like image 26
ubershmekel Avatar answered Nov 02 '22 06:11

ubershmekel


There is another problem with the accepted solution I think. If you have multiple replica's, the inserts are not guaranteed to end up in order.

From datastax docs:

now() - In the coordinator node, generates a new unique timeuuid in milliseconds when the statement is executed. The timestamp portion of the timeuuid conforms to the UTC (Universal Time) standard. This method is useful for inserting values. The value returned by now() is guaranteed to be unique.

When you have multiple replica's, you also have multiple coordinator nodes since any node can be chosen to be the coordinator node. This means your inserts are not in order because of any small variations of time on the nodes. So one insert that occured actually later to your reference frame, might be sorted before a previous inserted record, because now() simply generates a date on the coordinator node that is a bit behind.

You're trying to get some consistent (or single reference to the truth) view on your data. Unfortunately, in a distributed environment there isn't a single reference to the truth.

like image 2
Robin van den Berg Avatar answered Nov 02 '22 06:11

Robin van den Berg