Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiplying large matrices with dask

I am working on a project which basically boils down to solving the matrix equation

A.dot(x) = d

where A is a matrix with dimensions roughly 10 000 000 by 2000 (I would like to increase this in both directions eventually).

A obviously does not fit in memory, so this has to be parallelized. I do that by solving A.T.dot(A).dot(x) = A.T.dot(d) instead. A.T will have dimensions 2000 by 2000. It can be calculated by dividing A and d into chunks A_i and d_i, along the rows, calculate A_i.T.dot(A_i) and A_i.T.dot(d_i), and sum these. Perfect for parallellizing. I have been able to implement this with the multiprocessing module, but it is 1) hard to scale any further (increasing A in both dimensions), due to memory use, and 2) not pretty (and therefore not easy to maintain).

Dask seems to be a very promising library for solving both these problems, and I have made some attempts. My A matrix is complicated to calculate: It is based on approximately 15 different arrays (with size equal to the number of rows in A), and some are used in an iterative algorithm to evaluate associated Legendre function. When the chunks are small (10000 rows), it takes a very long time to build the task graph, and it takes a lot of memory (the memory increase coincides with the call to the iterative algorithm). When the chunks are larger (50000 rows), the memory consumption prior to calculations is a lot smaller, but it is rapidly depleted when calculating A.T.dot(A). I have tried with cache.Chest, but it significantly slows down the calculations.

The task graph must be very large and complicated - calling A._visualize() crashes. With simpler A matrices, it works to do this directly (see response by @MRocklin). Is there a way for me to simplify it?

Any advice on how to get around this would be highly appreciated.

As a toy example, I tried

A = da.ones((2e3, 1e7), chunks = (2e3, 1e3)) 
(A.T.dot(A)).compute()

This also failed, using up all the memory with only one core being active. With chunks = (2e3, 1e5), all cores start almost immediately, but MemoryError appears within 1 second (I have 15 GB on my current computer). chunks = (2e3, 1e4) was more promising, but it ended up consuming all memory as well.

Edit: I struckthrough the toy example test, because the dimensions were wrong, and corrected the dimensions in the rest. As @MRocklin says, it does work with the right dimensions. I added a question which I now think is more relevant to my problem.

Edit2: This is a much simplified example of what I was trying to do. The problem is, I believe, the recursions involved in defining the columns in A.

import dask.array as da

N = 1e6
M = 500

x = da.random.random((N, 1), chunks = 5*M)

# my actual 
A_dict = {0:x}
for i in range(1, M):
    A_dict[i] = 2*A_dict[i-1]
A = da.hstack(tuple(A_dict.values()))
A = A.rechunk((M*5, M))
ATA = A.T.dot(A)

This seems to lead to a very complicated task graph, which takes up a lot of memory before the calculations even start.

I have now solved this by placing the recursion in a function, with numpy arrays, and more or less do A = x.map_blocks(...).

As a second note, once I have the A matrix task graph, calculating A.T.dot(A) directly does seem to give some memory issues (memory usage is not very stable). I therefore explicitly calculate it in chunks, and sum the results. Even with these workarounds, dask makes a big difference in speed and readability.

like image 863
sulkeh Avatar asked Nov 09 '22 20:11

sulkeh


1 Answers

Your output is very very large.

>>> A.T.dot(A).shape
(10000000, 10000000)

Perhaps you intended to compute this with the transposes in the other direction?

>>> A.dot(A.T).shape
(2000, 2000)

This still takes a while (it's a large computation) but it does complete.

like image 132
MRocklin Avatar answered Nov 14 '22 21:11

MRocklin