I want to use multiprocessing
on a large dataset to find the distance between two gps points. I constructed a test set, but I have been unable to get multiprocessing
to work on this set.
import pandas as pd from geopy.distance import vincenty from itertools import combinations import multiprocessing as mp df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0], 'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'], 'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]}) def calc_dist(x): return pd.DataFrame( [ [grp, df.loc[c[0]].ser_no, df.loc[c[1]].ser_no, vincenty(df.loc[c[0], x], df.loc[c[1], x]) ] for grp,lst in df.groupby('co_nm').groups.items() for c in combinations(lst, 2) ], columns=['co_nm','machineA','machineB','distance']) if __name__ == '__main__': pool = mp.Pool(processes = (mp.cpu_count() - 1)) pool.map(calc_dist, ['lat','lon']) pool.close() pool.join()
I am using Python 2.7.11 and Ipython 4.1.2 with Anaconda 2.5.0 64-bit on Windows7 Professional when this error occurs.
runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop') Traceback (most recent call last):
File "", line 1, in runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop')
File "C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py", line 699, in runfile execfile(filename, namespace)
File "C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py", line 74, in execfile exec(compile(scripttext, filename, 'exec'), glob, loc)
File "C:/..../multiprocessing test.py", line 33, in pool.map(calc_dist, ['lat','lon'])
File "C:...\AppData\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py", line 251, in map return self.map_async(func, iterable, chunksize).get()
File "C:...\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py", line 567, in get raise self._value
TypeError: Failed to create Point instance from 1.
def get(self, timeout=None): self.wait(timeout) if not self._ready: raise TimeoutError if self._success: return self._value else: raise self._value
It makes it very easy to do multiprocessing in Pandas. This package works like a charm on my MacBook.
TLDR; Dask DataFrame can parallelize pandas apply() and map() operations, but it can do much more. With Dask's map_partitions(), you can work on each partition of your Dask DataFrame, which is a pandas DataFrame, while leveraging parallelism for various custom workflows.
Operations on data frame using Pandas is slow, as it uses a single-core of CPU to perform the computations, and does not take advantage of a multi-core CPU.
However, most machine learning and scientific libraries used by data scientists (Numpy, Pandas, scikit-learn, and so on) release the GIL, effectively allowing multithreaded execution on separate workers.
This line from your code:
pool.map(calc_dist, ['lat','lon'])
spawns 2 processes - one runs calc_dist('lat')
and the other runs calc_dist('lon')
. Compare the first example in doc. (Basically, pool.map(f, [1,2,3])
calls f
three times with arguments given in the list that follows: f(1)
, f(2)
, and f(3)
.) If I'm not mistaken, your function calc_dist
can only be called calc_dist('lat', 'lon')
. And it doesn't allow for parallel processing.
I believe you want to split the work between processes, probably sending each tuple (grp, lst)
to a separate process. The following code does exactly that.
First, let's prepare for splitting:
grp_lst_args = list(df.groupby('co_nm').groups.items()) print(grp_lst_args) [('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
We'll send each of these tuples (here, there are three of them) as an argument to a function in a separate process. We need to rewrite the function, let's call it calc_dist2
. For convenience, it's argument is a tuple as in calc_dist2(('aa',[0,1,2]))
def calc_dist2(arg): grp, lst = arg return pd.DataFrame( [ [grp, df.loc[c[0]].ser_no, df.loc[c[1]].ser_no, vincenty(df.loc[c[0], ['lat','lon']], df.loc[c[1], ['lat','lon']]) ] for c in combinations(lst, 2) ], columns=['co_nm','machineA','machineB','distance'])
And now comes the multiprocessing:
pool = mp.Pool(processes = (mp.cpu_count() - 1)) results = pool.map(calc_dist2, grp_lst_args) pool.close() pool.join() results_df = pd.concat(results)
results
is a list of results (here data frames) of calls calc_dist2((grp,lst))
for (grp,lst)
in grp_lst_args
. Elements of results
are later concatenated to one data frame.
print(results_df) co_nm machineA machineB distance 0 aa 1 2 156.876149391 km 1 aa 1 3 313.705445447 km 2 aa 2 3 156.829329105 km 0 cc 8 9 156.060165391 km 1 cc 8 0 311.910998169 km 2 cc 9 0 155.851498134 km 0 bb 4 5 156.665641837 km 1 bb 4 6 313.214333025 km 2 bb 4 7 469.622535339 km 3 bb 5 6 156.548897414 km 4 bb 5 7 312.957597466 km 5 bb 6 7 156.40899677 km
BTW, In Python 3 we could use a with
construction:
with mp.Pool() as pool: results = pool.map(calc_dist2, grp_lst_args)
Update
I tested this code only on linux. On linux, the read only data frame df
can be accessed by child processes and is not copied to their memory space, but I'm not sure how it exactly works on Windows. You may consider splitting df
into chunks (grouped by co_nm
) and sending these chunks as arguments to some other version of calc_dist
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With