I'm looking for fast ways to store and retrieve numpy
array using pyarrow
. I'm pretty satisfied with retrieval. It takes less than 1 second to extract columns from my .arrow
file that contains 1.000.000.000 integers of dtype = np.uint16
.
import pyarrow as pa
import numpy as np
def write(arr, name):
arrays = [pa.array(col) for col in arr]
names = [str(i) for i in range(len(arrays))]
batch = pa.RecordBatch.from_arrays(arrays, names=names)
with pa.OSFile(name, 'wb') as sink:
with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
writer.write_batch(batch)
def read(name):
source = pa.memory_map(name, 'r')
table = pa.ipc.RecordBatchStreamReader(source).read_all()
for i in range(table.num_columns):
yield table.column(str(i)).to_numpy()
arr = np.random.randint(65535, size=(250, 4000000), dtype=np.uint16)
%%timeit -r 1 -n 1
write(arr, 'test.arrow')
>>> 25.6 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1
for n in read('test.arrow'): n
>>> 901 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Can efficiency of writing to .arrow
format be improved? In addition, I tested np.save
:
%%timeit -r 1 -n 1
np.save('test.npy', arr)
>>> 18.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
It looks a little bit faster. Can we optimise Apache Arrow for better writing into .arrow
format further?
By explicitly declaring the "ndarray" data type, your array processing can be 1250x faster. This tutorial will show you how to speed up the processing of NumPy arrays using Cython. By explicitly specifying the data types of variables in Python, Cython can give drastic speed increases at runtime.
array(a) . List append is faster than array append .
Your answer is certainly faster than the method given in the question, but much slower than the best practices for numpy. It really is a clever way to merge pairs of arrays in the minimum number of operations, but concatenate accepts lists of any length so you aren't limited to pairs.
Because the Numpy array is densely packed in memory due to its homogeneous type, it also frees the memory faster. So overall a task executed in Numpy is around 5 to 100 times faster than the standard python list, which is a significant leap in terms of speed.
It may be the case that the performance issue is mainly due to IO/disk speed. In this case, there isn't much you can improve.
I ran a few tests on my device. The numbers I get are different from yours. But the bottom line is the same, writing is slower than reading.
The resulting file is 1.9 GB (2000023184 bytes):
$ ls test.arrow -l
-rw-rw-r-- 1 0x26res 0x26res 2000023184 Nov 15 10:01 test.arrow
In the code below I generate 1.9 GB of random bytes, and save them, then compare to the time it took to save with arrow:
import secrets
data = b"\x00" + secrets.token_bytes(2000023184) + b"\x00"
def write_bytes(data, name):
with open(name, 'wb') as fp:
fp.write(data)
%%timeit -r 1 -n 1 write_bytes(data, 'test.bytes')
>>> 2.29 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1 write(arr, 'test.arrow')
>>> 2.52 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
On my device, it takes 2.52 seconds to write the data using arrow. If I try to write that amount of random bytes it takes 2.29 seconds. It means the overhead or arrow is about 10% of the write time, so there isn't much that can be done to speed it up.
Indeed, it appears to be some kind of limitations of my RAM/IO/disk. Very silent ones... It slows down my writing 3 - 8 times after arr
exceeds 200M items and that's why I'm experiencing a drop of speed from 2.5 seconds to 20. I would be glad to know if this could be resolved in pyarrow
.
def pyarrow_write_arrow_Batch(arr, name):
arrays = [pa.array(col) for col in arr]
names = [str(i) for i in range(len(arrays))]
batch = pa.RecordBatch.from_arrays(arrays, names=names)
with pa.OSFile(name, 'wb') as sink:
with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
writer.write_batch(batch)
%matplotlib notebook
import benchit
benchit.setparams(environ='notebook')
benchit.setparams(rep=5)
arr = np.random.randint(65535, size=(int(1e9),), dtype=np.uint16)
size = [4, 8, 12, 20, 32, 48, 64, 100, 160, 256, 400, 600, 1000]
def pwa_Batch_10000(arr, name): return pyarrow_write_arrow_Batch(arr.reshape(-1, 10000), name)
def pwa_Batch_100000(arr, name): return pyarrow_write_arrow_Batch(arr.reshape(-1, 100000), name)
def pwa_Batch_1000000(arr, name): return pyarrow_write_arrow_Batch(arr.reshape(-1, 1000000), name)
def pwa_Batch_4000000(arr, name): return pyarrow_write_arrow_Batch(arr.reshape(-1, 4000000), name)
fns = [pwa_Batch_10000, pwa_Batch_100000, pwa_Batch_1000000, pwa_Batch_4000000]
in_ = {s: (arr[:s*int(1e6)], 'test.arrow') for s in size}
t = benchit.timings(fns, in_, multivar=True, input_name='Millions of items')
t.plot(logx=True, figsize=(8,4), fontsize=10)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With