Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is my Cassandra Database too slow in reading data? Want to read 100,000 rows in less than 10s

I have a cassandra table 'articles' with 400,000 rows with

primary key (source,created_at desc)

When I query our data using:

select * from articles where source = 'abc' and created_at <= '2016-01-01 00:00:00'

it takes 8 minutes to read 110,000 rows.

This is extremely slow and I don't know where the error lies.

I would like to read 100,000 rows in less than 10s. Not sure if this is possible?

Here some more details:

I have 3 nodes, replication factor =2, stragegy=SimpleStrategy, 4CPU, 32G RAM
I am using Cassandra-driver-3.0.0. 

I am not sure if it comes from python or Cassandra since we are also using python.

Here is my CQL schema:

CREATE TABLE crawler.articles (
    source text,
    created_at timestamp,
    id text,
    category text,
    channel text,
    last_crawled timestamp,
    text text,
    thumbnail text,
    title text,
    url text,
    PRIMARY KEY (source, created_at, id)
) WITH CLUSTERING ORDER BY (created_at DESC, id ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"ALL"}'
AND comment = ''
AND compaction = {'sstable_size_in_mb': '160', 'enabled': 'true', 'unchecked_tombstone_compaction': 'false', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 604800
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';

CREATE INDEX articles_id_idx ON crawler.articles (id);
CREATE INDEX articles_url_idx ON crawler.articles (url);

Edit:

I want to query within the last couple of days new articles therefore my query is:

SELECT * FROM articles WHERE source = 'any source' 
AND created_at >= '2016-01-08 00:00:00'

A sample insert would be:

INSERT INTO articles (source,created_at,id,category,channel,last_crawled,text,thumbnail,title,url) 
VALUES ('money',1452417991000,'1290141063','news_video_top','',1452418260000,'','http://inews.gtimg.com/newsapp_ls/0/143487758_150120/0','article title','http://view.inews.qq.com/a/VID2016011002195801');

Client code:

'''
import sys
import logging
from cassandra import ConsistencyLevel

timespan = int(sys.argv[1])
source = str(sys.argv[2])

logging.basicConfig(filename='statistics-%d.log' % (timespan), format='%(asctime)-15s %(filename)s %(name)-8s %(message)s', level=logging.INFO)

class Whitelist(logging.Filter):
    def __init__(self, *whitelist):
        self.whitelist = [logging.Filter(name) for name in whitelist]

    def filter(self, record):
        return any(f.filter(record) for f in self.whitelist)

for handler in logging.root.handlers:
    handler.addFilter(Whitelist('statistics'))

log = logging.getLogger('statistics')

try:
    from datetime import datetime, timedelta

    if __name__ == '__main__':
        pass

    from cassandra.cluster import Cluster

    log.info('[%d] connecting cassandra...' % (timespan))
    cluster = Cluster(['xxx', 'xxx', 'xxx'])
    session = cluster.connect('crawler')

    cluster = Cluster(['xxx', 'xxx', 'xxx'])
    session_statis = cluster.connect('statistics')

    created_at = datetime.utcnow() + timedelta(hours=-timespan)

    print "[%s] FINDING ..." % (datetime.utcnow().isoformat())
    statuses = {}

    stmt = session.prepare("select * from articles where source = ? and created_at >= ? ")
    category_stmt = session.prepare('SELECT category FROM channels WHERE source = ? and id = ?')

    rows = session.execute(stmt, [source, created_at])

    for row in rows:
        try:
            if row.channel and source != 'toutiao':
                category = session.execute(category_stmt, ['zhihu' if row.source=='zhihuzero' else row.source, row.channel])
                statuses[row.id] = {'source':row.source, 'timespan': str(timespan), 'id': row.id, 'title':row.title, 'thumbnail':row.thumbnail, 'url':row.url, 'text':row.text, 'created_at':row.created_at, 'category': category[0].category, 'author':'', 'genre':row.category }
            else:
                statuses[row.id] = {'source':row.source, 'timespan': str(timespan), 'id': row.id, 'title':row.title, 'thumbnail':row.thumbnail, 'url':row.url, 'text':row.text, 'created_at':row.created_at, 'category': row.category, 'author':'', 'genre':'' }

        except Exception, e:
            continue

    print "%s weibos ..." % (len(statuses))
    print "[%s] CACULATING ..." % (datetime.utcnow().isoformat())
    stmt = session.prepare('SELECT article, MAX(comments) AS comments,MAX(likes) AS likes,MAX(reads) AS reads,MAX(shares) AS shares FROM axes WHERE article = ? AND at >= ?')

    for statuses_id, status in statuses.iteritems():
        rows = session.execute(stmt, [statuses_id, datetime.utcnow() + timedelta(hours=-timespan)])
        for row in rows:
            if source == 'toutiao':
                if not row.article is None:
                    status['reads'] = row.reads
                    status['likes'] = row.likes
                    status['shares'] = row.shares
                    status['comments'] = row.comments
                    status['speed'] = row.comments
                else:
                    status['reads'] = 0
                    status['likes'] = 0
                    status['shares'] = 0
                    status['comments'] = 0
                    status['speed'] = 0
            elif source == 'weibohao':
                if not row.article is None:
                    status['reads'] = row.reads
                    status['likes'] = row.likes
                    status['shares'] = row.shares
                    status['comments'] = row.comments
                    # status['speed'] = row.comments - row.comments_1
                    status['speed'] = row.shares
                else:
                    status['reads'] = 0
                    status['likes'] = 0
                    status['shares'] = 0
                    status['comments'] = 0
                    status['speed'] = 0
            elif source == 'tencent':
                if not row.article is None:
                    status['reads'] = row.reads
                    status['likes'] = row.likes
                    status['shares'] = row.shares
                    status['comments'] = row.comments
                    # status['speed'] = row.comments - row.comments_1
                    status['speed'] = row.comments
                else:
                    status['reads'] = 0
                    status['likes'] = 0
                    status['shares'] = 0
                    status['comments'] = 0
                    status['speed'] = 0
            elif source == 'zhihu':
                if not row.article is None:
                    status['reads'] = row.reads
                    status['likes'] = row.likes
                    status['shares'] = row.shares
                    status['comments'] = row.comments
                    # status['speed'] = row.comments - row.comments_1
                    status['speed'] = row.likes
                else:
                    status['reads'] = 0
                    status['likes'] = 0
                    status['shares'] = 0
                    status['comments'] = 0
                    status['speed'] = 0
            elif source == 'buluo':
                if not row.article is None:
                    status['reads'] = row.reads
                    status['likes'] = row.likes
                    status['shares'] = row.shares
                    status['comments'] = row.comments
                    # status['speed'] = row.comments - row.comments_1
                    status['speed'] = row.reads
                else:
                    status['reads'] = 0
                    status['likes'] = 0
                    status['shares'] = 0
                    status['comments'] = 0
                    status['speed'] = 0
        elif source == 'zhihuzero':
                if not row.article is None:
                    status['reads'] = row.reads
                    status['likes'] = row.likes
                    status['shares'] = row.shares
                    status['comments'] = row.comments
                    # status['speed'] = row.comments - row.comments_1
                    status['speed'] = row.likes
                else:
                    status['reads'] = 0
                    status['likes'] = 0
                    status['shares'] = 0
                    status['comments'] = 0
                    status['speed'] = 0

    statuses = sorted(statuses.iteritems(), key=lambda (k, v): (v['speed'], k), reverse=True)[:1000]

    print "[%s] TRUNCATING ..." % (datetime.utcnow().isoformat())
    session_statis.execute('DELETE FROM statistics WHERE source = %s AND timespan = %s', (source, str(timespan))) #, consistency_level=ConsistencyLevel.QUORUM

    print "[%s] UPDATING ..." % (datetime.utcnow().isoformat())
    for i, status in statuses:
        if status['speed'] > 0:
            session_statis.execute('insert into statistics.statistics(source,timespan,id,title,thumbnail,url,text,created_at,category,genre,author,reads,likes,comments,shares,speed) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', (status['source'], status['timespan'], status['id'], status['title'], status['thumbnail'], status['url'], status['text'], status['created_at'], status['category'], status['genre'], status['author'], status['reads'], status['likes'], status['comments'], status['shares'], status['speed']))
        else:
            print status['id'], status['url']

    print "[%s] DONE ..." % (datetime.utcnow().isoformat())
    log.info('[%d] done' % (timespan))

except Exception, e:
    print 'except ===:', e

Thanks for your replies!

like image 787
peter Avatar asked Dec 19 '22 20:12

peter


1 Answers

Your use case is a little unusual. Cassandra is intended more for transactional operations on a small number of rows rather than doing bulk processing like you might do in hadoop.

The way you are doing your query, you are accessing one partition on a single node and transferring the 100K rows to your client. That's a lot of data to move across the network and I'm not sure why you would want to do that. You're doing everything sequentially, so you're getting no parallelism or benefit from having three nodes.

Usually if you want to do bulk processing on a lot of rows in Cassandra, you'd use Spark to do distributed processing on each node rather than sequentially fetch a lot of data to a client.

Also the two indexes you are creating don't look like they will work very well. Cassandra indexes are intended for fields that have a low cardinality, but you appear to be creating indexes on high cardinality fields. Cassandra indexes are very different from indexes in relational databases.

I'd have to see you client code to know if you are doing something inefficiently there. Usually fetching a lot of rows would trigger paging, so I'm not sure how you're handling that.

like image 118
Jim Meyer Avatar answered May 13 '23 20:05

Jim Meyer