I have multiple large files (> 5M rows of data) that are sorted on a unique timestamp. All the files contain virtually all the same timestamps except for a handful of randomly missing rows (< 1000). I'd like to efficiently join the data from all the files into a single dataset with one row per timestamp, preferably using a generator.
Except for the missing rows, I could just use zip:
def get_data(list_of_iterables):
for data in zip(*list_of_iterables):
yield data
However, since there are some missing rows, I need to join the data on timestamp instead of simply zipping. I can simply ignore any rows that don't have matching timestamps in every file.
Is there a pythonic way to implement this functionality in a few lines?
My approach would be to advance each iterable in turn until it's timestamp is no longer less than the maximum timestamp for the group of iterables. Whenever all the timestamps match, yield a row and advance all the iterables. But, the logic seems messy when I try to implement this approach.
Edit: Performance.
The implementation needs to start returning rows without reading all the data into memory first. It takes a while to read all the data and many times only the first handful of rows needs to be examined.
I ended up writing the following code to solve my problem, which turned out to be lighter than I expected:
def advance_values(iters):
for it in iters:
yield next(it)
def align_values(iters, values, key):
for it, value in zip(iters, values):
while (value[0],value[1]) < key:
value = next(it)
yield value
def merge_join(*iters):
values = list(advance_values(iters))
while True:
if len(values) != len(iters):
return
tms = [(v[0],v[1]) for v in values]
max_tm = max(tms)
if all((v[0],v[1]) == max_tm for v in values):
yield values
values = list(advance_values(iters))
else:
values = list(align_values(iters, values, max_tm))
If each iterable in list_of_iterables
is sorted by timestamp
then you could use heapq.merge()
to merge them taking into account possible gaps in the data and itertools.groupby()
to group records with the same timestamp:
from heapq import merge
from itertools import groupby
from operator import attrgetter
for timestamp, group in groupby(merge(*list_of_iterables),
key=attrgetter('timestamp')):
print(timestamp, list(group)) # same timestamp
The implementation yields groups without reading all the data into memory first.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With