Background:
I have a python
module set up to grab JSON objects from a streaming API and store them (bulk insert of 25 at a time) in MongoDB using pymongo. For comparison, I also have a bash command to curl
from the same streaming API and pipe
it to mongoimport
. Both these approaches store data in separate collections.
Periodically, I monitor the count()
of the collections to check how they fare.
So far, I see the python
module lagging by about 1000 JSON objects behind the curl | mongoimport
approach.
Problem:
How can I optimize my python
module to be ~ in sync with the curl | mongoimport
?
I cannot use tweetstream
since I am not using the Twitter API but a 3rd party streaming service.
Could someone please help me out here?
Python
module:
class StreamReader:
def __init__(self):
try:
self.buff = ""
self.tweet = ""
self.chunk_count = 0
self.tweet_list = []
self.string_buffer = cStringIO.StringIO()
self.mongo = pymongo.Connection(DB_HOST)
self.db = self.mongo[DB_NAME]
self.raw_tweets = self.db["raw_tweets_gnip"]
self.conn = pycurl.Curl()
self.conn.setopt(pycurl.ENCODING, 'gzip')
self.conn.setopt(pycurl.URL, STREAM_URL)
self.conn.setopt(pycurl.USERPWD, AUTH)
self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
self.conn.perform()
except Exception as ex:
print "error ocurred : %s" % str(ex)
def handle_data(self, data):
try:
self.string_buffer = cStringIO.StringIO(data)
for line in self.string_buffer:
try:
self.tweet = json.loads(line)
except Exception as json_ex:
print "JSON Exception occurred: %s" % str(json_ex)
continue
if self.tweet:
try:
self.tweet_list.append(self.tweet)
self.chunk_count += 1
if self.chunk_count % 1000 == 0
self.raw_tweets.insert(self.tweet_list)
self.chunk_count = 0
self.tweet_list = []
except Exception as insert_ex:
print "Error inserting tweet: %s" % str(insert_ex)
continue
except Exception as ex:
print "Exception occurred: %s" % str(ex)
print repr(self.buff)
def __del__(self):
self.string_buffer.close()
Thanks for reading.
Does MongoDB use JSON? No. MongoDB uses BSON (Binary JSON), which has been extended to add some optional non-JSON-native data types such as dates and binary data. JSON is converted to BSON to be stored in MongoDB and converted back to JSON when retrieved from the database.
How to parse a JSON string using Streaming API in Java? The Streaming API consists of an important interface JsonParser and this interface contains methods to parse JSON in a streaming way and provides forward, read-only access to JSON data. The Json class contains the methods to create parsers from input sources.
In this article, we will be looking at the Jackson Streaming API. It supports both reading and writing, and by using it, we can write high-performance and fast JSON parsers. On the flip-side, it is a bit difficult to use – every detail of JSON data needs to be handled explicitly in code.
After installing mongo-tools, you will have access to a series of CLI tools that can be used to interact with your MongoDB cluster. One of those tools is mongoimport, which can be used to import a JSON file into one of your collections. To do so, run the following command in your terminal.
Originally there was a bug in your code.
if self.chunk_count % 50 == 0
self.raw_tweets.insert(self.tweet_list)
self.chunk_count = 0
You reset the chunk_count but you don't reset the tweet_list. So second time through you try to insert 100 items (50 new ones plus 50 that were already sent to DB the time before). You've fixed this, but still see a difference in performance.
The whole batch size thing turns out to be a red herring. I tried using a large file of json and loading it via python vs. loading it via mongoimport and Python was always faster (even in safe mode - see below).
Taking a closer look at your code, I realized the problem is with the fact that the streaming API is actually handing you data in chunks. You are expected to just take those chunks and put them into the database (that's what mongoimport is doing). The extra work your python is doing to split up the stream, add it to a list and then periodically send batches to Mongo is probably the difference between what I see and what you see.
Try this snippet for your handle_data()
def handle_data(self, data):
try:
string_buffer = StringIO(data)
tweets = json.load(string_buffer)
except Exception as ex:
print "Exception occurred: %s" % str(ex)
try:
self.raw_tweets.insert(tweets)
except Exception as ex:
print "Exception occurred: %s" % str(ex)
One thing to note is that your python inserts are not running in "safe mode" - you should change that by adding an argument safe=True
to your insert statement. You will then get an exception on any insert that fails and your try/catch will print the error exposing the problem.
It doesn't cost much in performance either - I'm currently running a test and after about five minutes, the sizes of two collections are 14120 14113.
Got rid of the StringIO library. As the WRITEFUNCTION
callback handle_data
, in this case, gets invoked for every line, just load the JSON
directly. Sometimes, however, there could be two JSON
objects contained in data. I am sorry, I can't post the curl
command that I use as it contains our credentials. But, as I said, this is a general issue applicable to any streaming API.
def handle_data(self, buf):
try:
self.tweet = json.loads(buf)
except Exception as json_ex:
self.data_list = buf.split('\r\n')
for data in self.data_list:
self.tweet_list.append(json.loads(data))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With