I have a cassandra table 'articles' with 400,000 rows with
primary key (source,created_at desc)
When I query our data using:
select * from articles where source = 'abc' and created_at <= '2016-01-01 00:00:00'
it takes 8 minutes to read 110,000 rows.
This is extremely slow and I don't know where the error lies.
I would like to read 100,000 rows in less than 10s. Not sure if this is possible?
Here some more details:
I have 3 nodes, replication factor =2, stragegy=SimpleStrategy, 4CPU, 32G RAM
I am using Cassandra-driver-3.0.0.
I am not sure if it comes from python or Cassandra since we are also using python.
Here is my CQL schema:
CREATE TABLE crawler.articles (
source text,
created_at timestamp,
id text,
category text,
channel text,
last_crawled timestamp,
text text,
thumbnail text,
title text,
url text,
PRIMARY KEY (source, created_at, id)
) WITH CLUSTERING ORDER BY (created_at DESC, id ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"ALL"}'
AND comment = ''
AND compaction = {'sstable_size_in_mb': '160', 'enabled': 'true', 'unchecked_tombstone_compaction': 'false', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 604800
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
CREATE INDEX articles_id_idx ON crawler.articles (id);
CREATE INDEX articles_url_idx ON crawler.articles (url);
Edit:
I want to query within the last couple of days new articles therefore my query is:
SELECT * FROM articles WHERE source = 'any source'
AND created_at >= '2016-01-08 00:00:00'
A sample insert would be:
INSERT INTO articles (source,created_at,id,category,channel,last_crawled,text,thumbnail,title,url)
VALUES ('money',1452417991000,'1290141063','news_video_top','',1452418260000,'','http://inews.gtimg.com/newsapp_ls/0/143487758_150120/0','article title','http://view.inews.qq.com/a/VID2016011002195801');
Client code:
'''
import sys
import logging
from cassandra import ConsistencyLevel
timespan = int(sys.argv[1])
source = str(sys.argv[2])
logging.basicConfig(filename='statistics-%d.log' % (timespan), format='%(asctime)-15s %(filename)s %(name)-8s %(message)s', level=logging.INFO)
class Whitelist(logging.Filter):
def __init__(self, *whitelist):
self.whitelist = [logging.Filter(name) for name in whitelist]
def filter(self, record):
return any(f.filter(record) for f in self.whitelist)
for handler in logging.root.handlers:
handler.addFilter(Whitelist('statistics'))
log = logging.getLogger('statistics')
try:
from datetime import datetime, timedelta
if __name__ == '__main__':
pass
from cassandra.cluster import Cluster
log.info('[%d] connecting cassandra...' % (timespan))
cluster = Cluster(['xxx', 'xxx', 'xxx'])
session = cluster.connect('crawler')
cluster = Cluster(['xxx', 'xxx', 'xxx'])
session_statis = cluster.connect('statistics')
created_at = datetime.utcnow() + timedelta(hours=-timespan)
print "[%s] FINDING ..." % (datetime.utcnow().isoformat())
statuses = {}
stmt = session.prepare("select * from articles where source = ? and created_at >= ? ")
category_stmt = session.prepare('SELECT category FROM channels WHERE source = ? and id = ?')
rows = session.execute(stmt, [source, created_at])
for row in rows:
try:
if row.channel and source != 'toutiao':
category = session.execute(category_stmt, ['zhihu' if row.source=='zhihuzero' else row.source, row.channel])
statuses[row.id] = {'source':row.source, 'timespan': str(timespan), 'id': row.id, 'title':row.title, 'thumbnail':row.thumbnail, 'url':row.url, 'text':row.text, 'created_at':row.created_at, 'category': category[0].category, 'author':'', 'genre':row.category }
else:
statuses[row.id] = {'source':row.source, 'timespan': str(timespan), 'id': row.id, 'title':row.title, 'thumbnail':row.thumbnail, 'url':row.url, 'text':row.text, 'created_at':row.created_at, 'category': row.category, 'author':'', 'genre':'' }
except Exception, e:
continue
print "%s weibos ..." % (len(statuses))
print "[%s] CACULATING ..." % (datetime.utcnow().isoformat())
stmt = session.prepare('SELECT article, MAX(comments) AS comments,MAX(likes) AS likes,MAX(reads) AS reads,MAX(shares) AS shares FROM axes WHERE article = ? AND at >= ?')
for statuses_id, status in statuses.iteritems():
rows = session.execute(stmt, [statuses_id, datetime.utcnow() + timedelta(hours=-timespan)])
for row in rows:
if source == 'toutiao':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
status['speed'] = row.comments
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'weibohao':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.shares
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'tencent':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.comments
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'zhihu':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.likes
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'buluo':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.reads
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'zhihuzero':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.likes
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
statuses = sorted(statuses.iteritems(), key=lambda (k, v): (v['speed'], k), reverse=True)[:1000]
print "[%s] TRUNCATING ..." % (datetime.utcnow().isoformat())
session_statis.execute('DELETE FROM statistics WHERE source = %s AND timespan = %s', (source, str(timespan))) #, consistency_level=ConsistencyLevel.QUORUM
print "[%s] UPDATING ..." % (datetime.utcnow().isoformat())
for i, status in statuses:
if status['speed'] > 0:
session_statis.execute('insert into statistics.statistics(source,timespan,id,title,thumbnail,url,text,created_at,category,genre,author,reads,likes,comments,shares,speed) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', (status['source'], status['timespan'], status['id'], status['title'], status['thumbnail'], status['url'], status['text'], status['created_at'], status['category'], status['genre'], status['author'], status['reads'], status['likes'], status['comments'], status['shares'], status['speed']))
else:
print status['id'], status['url']
print "[%s] DONE ..." % (datetime.utcnow().isoformat())
log.info('[%d] done' % (timespan))
except Exception, e:
print 'except ===:', e
Thanks for your replies!
Your use case is a little unusual. Cassandra is intended more for transactional operations on a small number of rows rather than doing bulk processing like you might do in hadoop.
The way you are doing your query, you are accessing one partition on a single node and transferring the 100K rows to your client. That's a lot of data to move across the network and I'm not sure why you would want to do that. You're doing everything sequentially, so you're getting no parallelism or benefit from having three nodes.
Usually if you want to do bulk processing on a lot of rows in Cassandra, you'd use Spark to do distributed processing on each node rather than sequentially fetch a lot of data to a client.
Also the two indexes you are creating don't look like they will work very well. Cassandra indexes are intended for fields that have a low cardinality, but you appear to be creating indexes on high cardinality fields. Cassandra indexes are very different from indexes in relational databases.
I'd have to see you client code to know if you are doing something inefficiently there. Usually fetching a lot of rows would trigger paging, so I'm not sure how you're handling that.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With