I am working on a project which basically boils down to solving the matrix equation
A.dot(x) = d
where A
is a matrix with dimensions roughly 10 000 000 by 2000 (I would like to increase this in both directions eventually).
A
obviously does not fit in memory, so this has to be parallelized. I do that by solving A.T.dot(A).dot(x) = A.T.dot(d)
instead. A.T
will have dimensions 2000 by 2000. It can be calculated by dividing A
and d
into chunks A_i
and d_i
, along the rows, calculate A_i.T.dot(A_i)
and A_i.T.dot(d_i)
, and sum these. Perfect for parallellizing. I have been able to implement this with the multiprocessing module, but it is 1) hard to scale any further (increasing A
in both dimensions), due to memory use, and 2) not pretty (and therefore not easy to maintain).
Dask seems to be a very promising library for solving both these problems, and I have made some attempts. My A
matrix is complicated to calculate: It is based on approximately 15 different arrays (with size equal to the number of rows in A
), and some are used in an iterative algorithm to evaluate associated Legendre function. When the chunks are small (10000 rows), it takes a very long time to build the task graph, and it takes a lot of memory (the memory increase coincides with the call to the iterative algorithm). When the chunks are larger (50000 rows), the memory consumption prior to calculations is a lot smaller, but it is rapidly depleted when calculating A.T.dot(A)
. I have tried with cache.Chest
, but it significantly slows down the calculations.
The task graph must be very large and complicated - calling A._visualize()
crashes. With simpler A
matrices, it works to do this directly (see response by @MRocklin). Is there a way for me to simplify it?
Any advice on how to get around this would be highly appreciated.
As a toy example, I tried
A = da.ones((2e3, 1e7), chunks = (2e3, 1e3))
(A.T.dot(A)).compute()
This also failed, using up all the memory with only one core being active. With chunks = (2e3, 1e5)
, all cores start almost immediately, but MemoryError
appears within 1 second (I have 15 GB on my current computer). chunks = (2e3, 1e4)
was more promising, but it ended up consuming all memory as well.
Edit: I struckthrough the toy example test, because the dimensions were wrong, and corrected the dimensions in the rest. As @MRocklin says, it does work with the right dimensions. I added a question which I now think is more relevant to my problem.
Edit2:
This is a much simplified example of what I was trying to do. The problem is, I believe, the recursions involved in defining the columns in A
.
import dask.array as da
N = 1e6
M = 500
x = da.random.random((N, 1), chunks = 5*M)
# my actual
A_dict = {0:x}
for i in range(1, M):
A_dict[i] = 2*A_dict[i-1]
A = da.hstack(tuple(A_dict.values()))
A = A.rechunk((M*5, M))
ATA = A.T.dot(A)
This seems to lead to a very complicated task graph, which takes up a lot of memory before the calculations even start.
I have now solved this by placing the recursion in a function, with numpy
arrays, and more or less do A = x.map_blocks(...)
.
As a second note, once I have the A
matrix task graph, calculating A.T.dot(A)
directly does seem to give some memory issues (memory usage is not very stable). I therefore explicitly calculate it in chunks, and sum the results. Even with these workarounds, dask makes a big difference in speed and readability.
Your output is very very large.
>>> A.T.dot(A).shape
(10000000, 10000000)
Perhaps you intended to compute this with the transposes in the other direction?
>>> A.dot(A.T).shape
(2000, 2000)
This still takes a while (it's a large computation) but it does complete.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With