Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to effectively read large (30GB+) TAR file with BZ2 JSON twitter files into PostgreSQL

Tags:

python

json

I'm trying to obtain twitter data from the archive.org archive and load it into a database. I am attempting to first load all tweets for a specific month, to then make a selection for tweets and only stage those I'm interested in (e.g. by locale, or hashtag).

I am able to run the script described below to do what I'm looking for, but I have an issue in that it is incredibly slow. It has run for approximately a half hour and only read ~ 6 / 50,000 of the inner .bz2 files in one TAR file.

Some stats of an example TAR file:

  • Total size: ~ 30-40GB
  • Number of inner .bz2 files (arranged in folders): 50,000
  • Size of one .bz2 file: ~600kb
  • Size of one extracted JSON file: ~5 MB, ~3600 tweets.

What should I be looking for when optimizing this process for speed?

  • Should I extract the files to disk instead of buffering them in Python?
  • Should I look at multithreading a part of the process? Which part of the process would be optimal for this?
  • Alternatively, is the speed I'm currently obtaining relatively normal for such a script?

The script is currently using ~ 3% of my CPU and ~ 6% of my RAM memory.

Any help is greatly appreciated.

import tarfile
import dataset # Using dataset as I'm still iteratively developing the table structure(s)
import json
import datetime


def scrape_tar_contents(filename):
    """Iterates over an input TAR filename, retrieving each .bz2 container:
       extracts & retrieves JSON contents; stores JSON contents in a postgreSQL database"""
    tar = tarfile.open(filename, 'r')
    inner_files = [filename for filename in tar.getnames() if filename.endswith('.bz2')]

    num_bz2_files = len(inner_files)
    bz2_count = 1
    print('Starting work on file... ' + filename[-20:])
    for bz2_filename in inner_files: # Loop over all files in the TAR archive
        print('Starting work on inner file... ' + bz2_filename[-20:] + ': ' + str(bz2_count) + '/' + str(num_bz2_files))
        t_extract = tar.extractfile(bz2_filename)
        data = t_extract.read()
        txt = bz2.decompress(data)

        tweet_errors = 0
        current_line = 1
        num_lines = len(txt.split('\n'))
        for line in txt.split('\n'):  # Loop over the lines in the resulting text file.
            if current_line % 100 == 0:
                print('Working on line ' + str(current_line) + '/' + str(num_lines))
                try:
                    tweet = json.loads(line)
                except ValueError, e:
                    error_log = {'Date_time': datetime.datetime.now(),
                                'File_TAR': filename,
                                'File_BZ2': bz2_filename,
                                'Line_number': current_line,
                                'Line': line,
                                'Error': str(e)}
                    tweet_errors += 1
                    db['error_log'].upsert(error_log, ['File_TAR', 'File_BZ2', 'Line_number'])
                    print('Error occured, now at ' + str(tweet_errors))
                try:
                    tweet_id = tweet['id']
                    tweet_text = tweet['text']
                    tweet_locale = tweet['lang']
                    created_at = tweet['created_at']
                    tweet_json = tweet
                    data = {'tweet_id': tweet_id,
                            'tweet_text': tweet_text,
                            'tweet_locale': tweet_locale,
                            'created_at_str': created_at,
                            'date_loaded': datetime.datetime.now(),
                            'tweet_json': tweet_json}
                    db['tweets'].upsert(data, ['tweet_id'])
                except KeyError, e:
                    error_log = {'Date_time': datetime.datetime.now(),
                                'File_TAR': filename,
                                'File_BZ2': bz2_filename,
                                'Line_number': current_line,
                                'Line': line,
                                'Error': str(e)}
                    tweet_errors += 1
                    db['error_log'].upsert(error_log, ['File_TAR', 'File_BZ2', 'Line_number'])
                    print('Error occured, now at ' + str(tweet_errors))
                    continue

if __name__ == "__main__":
    with open("postgresConnecString.txt", 'r') as f:
        db_connectionstring = f.readline()
    db = dataset.connect(db_connectionstring)

    filename = r'H:/Twitter datastream/Sourcefiles/archiveteam-twitter-stream-2013-01.tar'
    scrape_tar_contents(filename)
like image 861
MattV Avatar asked Jan 08 '15 11:01

MattV


1 Answers

A tar file does not contain an index of where files are located. Moreover, a tar file can contain more than one copy of the same file. Therefore, when you extract one file, the entire tar file must be read. Even after it finds the file, the rest of the tar file must still be read to check if a later copy exists.

That makes extraction of one file as expensive as extracting all the files.

Therefore, never use tar.extractfile(...) on a large tar file (unless you only need one file or don't have the space to extract everything).

If you have the space (and given the size of modern hard drives, you almost certainly do), extract everything either with tar.extractall or with a system call to tar xf ..., and then process the extracted files.

like image 157
unutbu Avatar answered Oct 16 '22 04:10

unutbu