Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python 3 join data from large files that are sorted

I have multiple large files (> 5M rows of data) that are sorted on a unique timestamp. All the files contain virtually all the same timestamps except for a handful of randomly missing rows (< 1000). I'd like to efficiently join the data from all the files into a single dataset with one row per timestamp, preferably using a generator.

Except for the missing rows, I could just use zip:

def get_data(list_of_iterables):
    for data in zip(*list_of_iterables):
        yield data

However, since there are some missing rows, I need to join the data on timestamp instead of simply zipping. I can simply ignore any rows that don't have matching timestamps in every file.

Is there a pythonic way to implement this functionality in a few lines?

My approach would be to advance each iterable in turn until it's timestamp is no longer less than the maximum timestamp for the group of iterables. Whenever all the timestamps match, yield a row and advance all the iterables. But, the logic seems messy when I try to implement this approach.

Edit: Performance.

The implementation needs to start returning rows without reading all the data into memory first. It takes a while to read all the data and many times only the first handful of rows needs to be examined.

like image 632
RandomBits Avatar asked Jun 30 '15 00:06

RandomBits


2 Answers

I ended up writing the following code to solve my problem, which turned out to be lighter than I expected:

def advance_values(iters):
    for it in iters:
        yield next(it)

def align_values(iters, values, key):
    for it, value in zip(iters, values):
        while (value[0],value[1]) < key:
            value = next(it)
        yield value

def merge_join(*iters):
    values = list(advance_values(iters))
    while True:
        if len(values) != len(iters):
            return
        tms = [(v[0],v[1]) for v in values]
        max_tm = max(tms)
        if all((v[0],v[1]) == max_tm for v in values):
            yield values
            values = list(advance_values(iters))
        else:
            values = list(align_values(iters, values, max_tm))
like image 94
RandomBits Avatar answered Sep 27 '22 17:09

RandomBits


If each iterable in list_of_iterables is sorted by timestamp then you could use heapq.merge() to merge them taking into account possible gaps in the data and itertools.groupby() to group records with the same timestamp:

from heapq import merge
from itertools import groupby
from operator import attrgetter

for timestamp, group in groupby(merge(*list_of_iterables), 
                                key=attrgetter('timestamp')):
    print(timestamp, list(group)) # same timestamp

The implementation yields groups without reading all the data into memory first.

like image 26
jfs Avatar answered Sep 27 '22 15:09

jfs