So, I have two very large lists of numbers l1
and l2
. I'd like to multiply each element of l1
with each element of l2
without explictly creating a new list of products. Thus, I'd like a generator. This part is easy. I can just do something like
for a in l1:
for b in l2:
yield a * b
However, I also need these products to be ordered by their magnitude. I am wondering if there is some clever trick to order the yield statements so that this can also be done with a generator. In Python 3, if possible. Thanks.
I'll call the lists xs
and ys
, and assume they're sorted. As you noted in a comment, the smallest product is necessarily xs[0] * ys[0]
- but only if you're also assuming that all numbers are non-negative, so I'll assume that too.
After the first product, it gets messier - else you already would have solved it ;-) The next two to consider are xs[0] * ys[1]
and xs[1] * ys[0]
. Easy enough, but then the next to consider depends on which of those won. If xs[0] * ys[1]
won, you only need to replace it with xs[0] * ys[2]
, but if xs[1] * ys[0]
won then both xs[1] * ys[1]
and xs[2] * ys[0]
come into play. And so on.
The following keeps track of the growing set of possibilities with a heap. The heap never holds more than len(xs)
items, so the code first arranges to make xs
the shorter list:
def upprod(xs, ys):
# xs and ys must be sorted, and non-negative
from heapq import heappush, heappop
# make xs the shorter
if len(ys) < len(xs):
xs, ys = ys, xs
if not xs:
return
lenxs = len(xs)
lenys = len(ys)
# the heap holds 4-tuples:
# (product, xs index, ys index, xs[xs index])
h = [(xs[0] * ys[0], 0, 0, xs[0])]
while h:
prod, xi, yi, x = heappop(h)
yield prod
# same x with next y
yi += 1
if yi < lenys:
heappush(h, (x * ys[yi], xi, yi, x))
# if this is the first time we used x, start
# the next x going
if yi == 1:
xi += 1
if xi < lenxs:
x = xs[xi]
heappush(h, (x * ys[0], xi, 0, x))
I'd be pleasantly surprised if an essentially more efficient solution existed. If someone thinks they have one, please try it first using this randomized tester:
from itertools import product
from random import randrange
MAXLEN = 10
UB = 1000
ntest = 0
while True:
ntest += 1
lenxs = randrange(MAXLEN + 1)
lenys = randrange(MAXLEN + 1)
xs = sorted(randrange(UB) for i in range(lenxs))
ys = sorted(randrange(UB) for i in range(lenys))
brute = sorted(a*b for a, b in product(xs, ys))
got = list(upprod(xs, ys))
if brute != got:
print("OUCH!")
print(xs)
print(ys)
print(brute)
print(got)
assert False
if ntest % 10_000 == 0:
print(f"finished test {ntest:,}")
The above doesn't fully exploit the partial ordering we can deduce from indices alone: if
i1 <= i2 and j1 <= j2
then we know
xs[i1] * ys[j1] <= xs[i2] * ys[j2]
because sorting implies xs[i1] <= xs[i2]
and ys[j1] <= ys[j2]
.
So, for example, if the index pairs (0, 1)
and (1, 0)
are on the heap, and the second one wins, (2, 0)
needs to be added to the heap but (1, 1)
doesn't: from the indices alone, we know that the product from the (0, 1)
remaining in the heap is no larger than the product from (1, 1)
. It's only when (0, 1)
is also removed that (1, 1)
needs to be added.
In general, each pair of the form (i, 0)
has a single immediate predecessor (i-1, 0)
, and (0, j)
the single (0, j-1)
, and all other (i, j)
have two immediate predecessors: (i-1, j)
and (i, j-1)
. There's no need to put a pair on the heap until all its predecessors have been taken off the heap.
Which leads to this code, which is seemingly "more elegant" because more symmetric:
def upprod(xs, ys):
# xs and ys must be sorted, and non-negative
from heapq import heappush, heappop
# make xs the shorter
if len(ys) < len(xs):
xs, ys = ys, xs
if not xs:
return
lenxs = len(xs)
lenys = len(ys)
# the heap holds 3-tuples:
# (product, xs index, ys index)
h = [(xs[0] * ys[0], 0, 0)]
# interior points for which only one immediate predecessor has
# been processed; there's no need to put them in the heap
# until their second predecessor has been processed too
pending = set()
def add(xi, yi):
if xi < lenxs and yi < lenys:
if xi and yi: # if either is 0, only one predecessor
p = xi, yi
if p in pending:
pending.remove(p)
else:
pending.add(p)
return
heappush(h, (xs[xi] * ys[yi], xi, yi))
while h:
prod, xi, yi = heappop(h)
yield prod
# same x with next y; and same y with next x
add(xi, yi + 1)
add(xi + 1, yi)
assert not pending
Compared to the first code, it keeps the heap somewhat smaller in many cases. But heap operations take time logarithmic in the number of heap entries, and the heap can still grow to len(xs)
entries, so that's not much of a win. It's probably lost to the overhead of the two new function calls (while inlining those is too ugly to bear).
My solution is to create a list of generators, one generator for each row in the product matrix, and then to use heapq.merge
to sort the outputs of those generators. Each generator has a size of 44 bytes on a 32 bit machine, so the whole list of generators only consumes a modest amount of RAM.
heapq.merge
(when no sorting key function is supplied) works by creating a 3-tuple of each of the iterables you pass it. That tuple contains the next value from the iterable, an index number for the iterable, and a reference to the iterable's __next__
method. It places those tuples onto a heap to perform a mergesort of the iterables' values. You can see the details in its Python source code.
Thus my approach is not quite as frugal as Tim Peters' solutions, but it's not too shabby, IMHO. ;)
def sorted_prod_merge(xs, ys):
''' mergesort generators of the rows. '''
if len(ys) < len(xs):
xs, ys = ys, xs
def gen(x):
for y in ys:
yield x * y
yield from merge(*[gen(x) for x in xs])
Here's some timeit
code which shows the running time of sorted_prod_merge
, Tim Peters' solutions, plus a few other versions of mine. I've used Tim's variable names to keep the code uniform. It's interesting to note that Tim's 1st version is roughly twice as fast as his more advanced solution. My sorted_prod_row
runs quite quickly, but it's a terrible RAM hog.
The timeit
code uses a technique given in the itertools
recipes to exhaust an iterator: we feed it to a zero-length deque. The time_test
code sorts the 3 results from each Timer
run. That's because the minimum result is the important one, the other values just give an indication of the variance in the system at the time the test ran. See the note in the docs for Timer.repeat
for details.
from heapq import heappush, heappop, merge
from random import seed, randrange
from timeit import Timer
from collections import deque
seed(163)
# Brute force method, as a generator
def sorted_prod_brute(xs, ys):
yield from sorted(x * y for x in xs for y in ys)
# By Tim Peters
def upprod1(xs, ys):
# xs and ys must be sorted, and non-negative
from heapq import heappush, heappop
# make xs the shorter
if len(ys) < len(xs):
xs, ys = ys, xs
if not xs:
return
lenxs = len(xs)
lenys = len(ys)
# the heap holds 4-tuples:
# (product, xs index, ys index, xs[xs index])
h = [(xs[0] * ys[0], 0, 0, xs[0])]
while h:
prod, xi, yi, x = heappop(h)
yield prod
# same x with next y
yi += 1
if yi < lenys:
heappush(h, (x * ys[yi], xi, yi, x))
# if this is the first time we used x, start
# the next x going
if yi == 1:
xi += 1
if xi < lenxs:
x = xs[xi]
heappush(h, (x * ys[0], xi, 0, x))
# By Tim Peters
def upprod2(xs, ys):
# xs and ys must be sorted, and non-negative
from heapq import heappush, heappop
# make xs the shorter
if len(ys) < len(xs):
xs, ys = ys, xs
if not xs:
return
lenxs = len(xs)
lenys = len(ys)
# the heap holds 3-tuples:
# (product, xs index, ys index)
h = [(xs[0] * ys[0], 0, 0)]
# interior points for which only one immediate predecessor has
# been processed; there's no need to put them in the heap
# until their second predecessor has been processed too
pending = set()
def add(xi, yi):
if xi < lenxs and yi < lenys:
doit = True
if xi and yi: # if either is 0, only one predecessor
p = xi, yi
if p in pending:
pending.remove(p)
else:
pending.add(p)
doit = False
if doit:
heappush(h, (xs[xi] * ys[yi], xi, yi))
while h:
prod, xi, yi = heappop(h)
yield prod
# same x with next y; and same y with next x
add(xi, yi + 1)
add(xi + 1, yi)
assert not pending
def sorted_prod_merge(xs, ys):
''' mergesort generators of the rows. '''
if len(ys) < len(xs):
xs, ys = ys, xs
def gen(x):
for y in ys:
yield x * y
yield from merge(*[gen(x) for x in xs])
def sorted_prod_row(xs, ys):
''' Heapsort, row by row.
Fast, but not space-efficient: the maximum
heap size grows to almost len(ys) * len(xs)
'''
if len(ys) < len(xs):
xs, ys = ys, xs
if not xs:
return
x, xs = xs[0], xs[1:]
heap = []
#big = 0
for y in ys:
lo = x * y
while heap and heap[0] <= lo:
yield heappop(heap)
yield lo
for u in xs:
heappush(heap, u * y)
#big = max(big, len(heap))
#print(big)
while heap:
yield heappop(heap)
def sorted_prod_diag(xs, ys):
''' Heapsort, going along the diagonals
50% slower than sorted_prod_row, but more
space-efficient: the maximum heap size
grows to around 0.5 * len(ys) * len(xs)
'''
if not (xs and ys):
return
lenxs, lenys = len(xs), len(ys)
heap = []
#big = 0
for n in range(lenxs + lenys - 1):
row = sorted(xs[n - i] * ys[i]
for i in range(max(0, n + 1 - lenxs), min(lenys, n + 1)))
lo = row[0]
while heap and heap[0] <= lo:
yield heappop(heap)
yield lo
for u in row[1:]:
heappush(heap, u)
#big = max(big, len(heap))
#print(big)
#assert not heap
def sorted_prod_block(xs, ys):
''' yield the top left corner, then merge sort
the top row, the left column and the remaining
block. So we end up with max(len(xs), len(ys))
recursively nested calls to merge(). It's ok
for small lists, but too slow otherwise.
'''
if not (xs and ys):
return
x, *xs = xs
y, *ys = ys
yield x * y
row = (y * u for u in xs)
col = (x * v for v in ys)
yield from merge(row, col, sorted_prod_block(xs, ys))
def sorted_prod_blockI(xs, ys):
''' Similar to sorted_prod_block except we use indexing
to avoid creating sliced copies of the lists
'''
lenxs, lenys = len(xs), len(ys)
def sorted_block(xi, yi):
if xi == lenxs or yi == lenys:
return
x, y = xs[xi], ys[yi]
yield x * y
xi, yi = xi + 1, yi + 1
row = (xs[i] * y for i in range(xi, lenxs))
col = (ys[i] * x for i in range(yi, lenys))
yield from merge(row, col, sorted_block(xi, yi))
yield from sorted_block(0, 0)
functions = (
sorted_prod_brute,
upprod1,
upprod2,
sorted_prod_merge,
#sorted_prod_row,
sorted_prod_diag,
#sorted_prod_block,
#sorted_prod_blockI,
)
UB = 1000
def verify(numtests, maxlen=10):
print('Verifying. maxlen =', maxlen)
for k in range(numtests):
lenxs = randrange(maxlen + 1)
lenys = randrange(maxlen + 1)
print(k, ':', lenxs, '*', lenys, '=', lenxs * lenys)
xs = sorted(randrange(UB) for i in range(lenxs))
ys = sorted(randrange(UB) for i in range(lenys))
good = list(sorted_prod_brute(xs, ys))
for func in functions[1:]:
result = list(func(xs, ys))
if result != good:
print(func.__name__, 'failed!')
print()
def time_test(loops=20):
timings = []
for func in functions:
# Consume the generator output by feeding it to a zero-length deque
t = Timer(lambda: deque(func(xs, ys), maxlen=0))
result = sorted(t.repeat(3, loops))
timings.append((result, func.__name__))
timings.sort()
for result, name in timings:
print('{:18} : {:.6f}, {:.6f}, {:.6f}'.format(name, *result))
print()
verify(10, 10)
verify(20, 100)
print('\nTimings')
loops = 8192
minlen = 5
for k in range(6):
lenxs = randrange(minlen, 2 * minlen)
lenys = randrange(minlen, 2 * minlen)
print(k, ':', loops, 'loops.', lenxs, '*', lenys, '=', lenxs * lenys)
xs = sorted(randrange(UB) for i in range(lenxs))
ys = sorted(randrange(UB) for i in range(lenys))
time_test(loops)
minlen *= 2
loops //= 4
Here's the output on my ancient 2GHz 32 bit single core machine, running Python 3.6.0 on an old Debian derivative distro of Linux. YMMV.
Verifying. maxlen = 10
0 : 8 * 9 = 72
1 : 9 * 0 = 0
2 : 1 * 7 = 7
3 : 8 * 10 = 80
4 : 10 * 5 = 50
5 : 10 * 0 = 0
6 : 5 * 2 = 10
7 : 5 * 10 = 50
8 : 3 * 0 = 0
9 : 0 * 6 = 0
Verifying. maxlen = 100
0 : 64 * 0 = 0
1 : 77 * 96 = 7392
2 : 24 * 13 = 312
3 : 53 * 39 = 2067
4 : 74 * 39 = 2886
5 : 92 * 97 = 8924
6 : 31 * 48 = 1488
7 : 39 * 17 = 663
8 : 42 * 25 = 1050
9 : 94 * 25 = 2350
10 : 82 * 83 = 6806
11 : 2 * 97 = 194
12 : 90 * 30 = 2700
13 : 93 * 24 = 2232
14 : 91 * 37 = 3367
15 : 24 * 86 = 2064
16 : 70 * 15 = 1050
17 : 2 * 4 = 8
18 : 72 * 58 = 4176
19 : 25 * 84 = 2100
Timings
0 : 8192 loops. 7 * 8 = 56
sorted_prod_brute : 0.659312, 0.665853, 0.710947
upprod1 : 1.695471, 1.705061, 1.739299
sorted_prod_merge : 1.990161, 1.991129, 2.001242
sorted_prod_diag : 3.013945, 3.018927, 3.053115
upprod2 : 3.582396, 3.586332, 3.622949
1 : 2048 loops. 18 * 16 = 288
sorted_prod_brute : 0.826128, 0.840111, 0.863559
upprod1 : 2.240931, 2.241636, 2.244615
sorted_prod_merge : 2.301838, 2.304075, 2.306918
sorted_prod_diag : 3.030672, 3.053302, 3.135322
upprod2 : 4.860378, 4.949804, 4.953891
2 : 512 loops. 39 * 32 = 1248
sorted_prod_brute : 0.907932, 0.918692, 0.942830
sorted_prod_merge : 2.559567, 2.561709, 2.604387
upprod1 : 2.700482, 2.701147, 2.757695
sorted_prod_diag : 2.961776, 2.965271, 2.995747
upprod2 : 5.563303, 5.654425, 5.656695
3 : 128 loops. 68 * 70 = 4760
sorted_prod_brute : 0.823448, 0.827748, 0.835049
sorted_prod_merge : 2.591373, 2.592134, 2.685534
upprod1 : 2.760466, 2.763615, 2.795082
sorted_prod_diag : 2.789673, 2.828662, 2.848498
upprod2 : 5.483504, 5.488450, 5.517847
4 : 32 loops. 122 * 156 = 19032
sorted_prod_brute : 0.873736, 0.880958, 0.892846
sorted_prod_merge : 2.701089, 2.742456, 2.818822
upprod1 : 2.875358, 2.881793, 2.922569
sorted_prod_diag : 2.953450, 2.988184, 3.012430
upprod2 : 5.780552, 5.812967, 5.826775
5 : 8 loops. 173 * 309 = 53457
sorted_prod_brute : 0.711012, 0.711816, 0.721627
sorted_prod_merge : 1.997386, 1.999774, 2.033489
upprod1 : 2.137337, 2.172369, 3.335119
sorted_prod_diag : 2.324447, 2.329552, 2.331095
upprod2 : 4.278704, 4.289019, 4.324436
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With