I'm trying to obtain twitter data from the archive.org archive and load it into a database. I am attempting to first load all tweets for a specific month, to then make a selection for tweets and only stage those I'm interested in (e.g. by locale, or hashtag).
I am able to run the script described below to do what I'm looking for, but I have an issue in that it is incredibly slow. It has run for approximately a half hour and only read ~ 6 / 50,000 of the inner .bz2 files in one TAR file.
Some stats of an example TAR file:
What should I be looking for when optimizing this process for speed?
The script is currently using ~ 3% of my CPU and ~ 6% of my RAM memory.
Any help is greatly appreciated.
import tarfile
import dataset # Using dataset as I'm still iteratively developing the table structure(s)
import json
import datetime
def scrape_tar_contents(filename):
"""Iterates over an input TAR filename, retrieving each .bz2 container:
extracts & retrieves JSON contents; stores JSON contents in a postgreSQL database"""
tar = tarfile.open(filename, 'r')
inner_files = [filename for filename in tar.getnames() if filename.endswith('.bz2')]
num_bz2_files = len(inner_files)
bz2_count = 1
print('Starting work on file... ' + filename[-20:])
for bz2_filename in inner_files: # Loop over all files in the TAR archive
print('Starting work on inner file... ' + bz2_filename[-20:] + ': ' + str(bz2_count) + '/' + str(num_bz2_files))
t_extract = tar.extractfile(bz2_filename)
data = t_extract.read()
txt = bz2.decompress(data)
tweet_errors = 0
current_line = 1
num_lines = len(txt.split('\n'))
for line in txt.split('\n'): # Loop over the lines in the resulting text file.
if current_line % 100 == 0:
print('Working on line ' + str(current_line) + '/' + str(num_lines))
try:
tweet = json.loads(line)
except ValueError, e:
error_log = {'Date_time': datetime.datetime.now(),
'File_TAR': filename,
'File_BZ2': bz2_filename,
'Line_number': current_line,
'Line': line,
'Error': str(e)}
tweet_errors += 1
db['error_log'].upsert(error_log, ['File_TAR', 'File_BZ2', 'Line_number'])
print('Error occured, now at ' + str(tweet_errors))
try:
tweet_id = tweet['id']
tweet_text = tweet['text']
tweet_locale = tweet['lang']
created_at = tweet['created_at']
tweet_json = tweet
data = {'tweet_id': tweet_id,
'tweet_text': tweet_text,
'tweet_locale': tweet_locale,
'created_at_str': created_at,
'date_loaded': datetime.datetime.now(),
'tweet_json': tweet_json}
db['tweets'].upsert(data, ['tweet_id'])
except KeyError, e:
error_log = {'Date_time': datetime.datetime.now(),
'File_TAR': filename,
'File_BZ2': bz2_filename,
'Line_number': current_line,
'Line': line,
'Error': str(e)}
tweet_errors += 1
db['error_log'].upsert(error_log, ['File_TAR', 'File_BZ2', 'Line_number'])
print('Error occured, now at ' + str(tweet_errors))
continue
if __name__ == "__main__":
with open("postgresConnecString.txt", 'r') as f:
db_connectionstring = f.readline()
db = dataset.connect(db_connectionstring)
filename = r'H:/Twitter datastream/Sourcefiles/archiveteam-twitter-stream-2013-01.tar'
scrape_tar_contents(filename)
A tar file does not contain an index of where files are located. Moreover, a tar file can contain more than one copy of the same file. Therefore, when you extract one file, the entire tar file must be read. Even after it finds the file, the rest of the tar file must still be read to check if a later copy exists.
That makes extraction of one file as expensive as extracting all the files.
Therefore, never use tar.extractfile(...)
on a large tar file (unless you only need one file or don't have the space to extract everything).
If you have the space (and given the size of modern hard drives, you almost certainly do), extract everything either with tar.extractall
or with a system call to tar xf ...
, and then process the extracted files.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With