I have a large data with more than 1 billion observations, and I need to perform some string operations which is slow.
My code is as simple as this:
DT[, var := some_function(var2)]
If I'm not mistaken, data.table
uses multithread when it is called with by
, and I'm trying to parallelize this operation utilizing this. To do so, I can make an interim grouper variable, such as
DT[, grouper := .I %/% 100]
and do
DT[, var := some_function(var2), by = grouper]
I tried some benchmarking with a small sample of data, but surprisingly I did not see a performance improvement. So my questions are:
data.table
use multithreading when it's used with by
?data.table
to use multithreading here?FYI, I see that multithreading enabled with half of my cores when I import data.table, so I guess there's no openMP issue here.
While it is possible to use multiple threads to add records to a DataTable object, adding rows to a DataTable must be synchronized. This means that only one thread can add a DataRow to the table at a time, essentially turning the operation into a single threaded operation.
As explained in the Machine Specifications section, the machine has 4 cores and therefore a maximum of 8 threads/ processes can be run in parallel. The time difference between threads and processes is nearly constant (3–4 seconds) when only operation 1 is performed
This means that only one thread can add a DataRow to the table at a time, essentially turning the operation into a single threaded operation. You will get little to no performance benefit from using multiple threads.
Dask’s documentation states that we should use threads to parallelize operation only when our tasks are dominated by non-Python code. However, if you just call .compute () on a dask dataframe, it will by default use threads to parallelize the execution.
I got answers from data.table
developers from data.table github.
Here's a summary:
Finding groups of by
variable itself is parallelized always, but more importantly,
If the function on j
is generic (User Defined Function) then there's no parallelization.
Operations on j
is parallelized if the function is (gforce) optimized (Expressions in j which contain only the functions min
, max
, mean
, median
, var
, sd
, sum
, prod
, first
, last
, head
, tail
)
So, it is advised to do parallel operation manually if the function on j
is generic, but it may not always guarantee speed gain. Reference
==Solution==
In my case, I encountered vector memory exhaust when I plainly used DT[, var := some_function(var2)]
even though my server had 1TB of ram, while data was taking 200GB of memory.
I used split(DT, by='grouper')
to split my data.table
into chunks, and utilized doFuture
foreach
%dopar%
to do the job. It was pretty fast.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With