Without doing in parallel programming I can merger left and right dataframe on key
column using below code, but it will be too slow since both are very large. is there any way I can do it in parallelize efficiently ?
I have 64 cores, and so practically I can use 63 of them to merge these two dataframe.
left = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3'],
'A': ['A0', 'A1', 'A2', 'A3'],
'B': ['B0', 'B1', 'B2', 'B3']})
right = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3'],
'C': ['C0', 'C1', 'C2', 'C3'],
'D': ['D0', 'D1', 'D2', 'D3']})
result = pd.merge(left, right, on='key')
output will be :
left:
A B key
0 A0 B0 K0
1 A1 B1 K1
2 A2 B2 K2
3 A3 B3 K3
right:
C D key
0 C0 D0 K0
1 C1 D1 K1
2 C2 D2 K2
3 C3 D3 K3
result:
A B key C D
0 A0 B0 K0 C0 D0
1 A1 B1 K1 C1 D1
2 A2 B2 K2 C2 D2
3 A3 B3 K3 C3 D3
I want to do this in parallel so I can do it at speed.
It makes it very easy to do multiprocessing in Pandas. This package works like a charm on my MacBook.
The concat() function can be used to concatenate two Dataframes by adding the rows of one to the other. The merge() function is equivalent to the SQL JOIN clause. 'left', 'right' and 'inner' joins are all possible.
TLDR; Dask DataFrame can parallelize pandas apply() and map() operations, but it can do much more. With Dask's map_partitions(), you can work on each partition of your Dask DataFrame, which is a pandas DataFrame, while leveraging parallelism for various custom workflows.
Pandas' merge and concat can be used to combine subsets of a DataFrame, or even data from different files. join function combines DataFrames based on index or column. Joining two DataFrames can be done in multiple ways (left, right, and inner) depending on what data must be in the final DataFrame.
I believe you can use dask.
and function merge
.
Docs say:
What definitely works?
Cleverly parallelizable operations (also fast):
Join on index: dd.merge(df1, df2, left_index=True, right_index=True)
Or:
Operations requiring a shuffle (slow-ish, unless on index)
Set index: df.set_index(df.x)
Join not on the index: pd.merge(df1, df2, on='name')
You can also check how Create Dask DataFrames.
Example
import pandas as pd
left = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3'],
'A': ['A0', 'A1', 'A2', 'A3'],
'B': ['B0', 'B1', 'B2', 'B3']})
right = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3'],
'C': ['C0', 'C1', 'C2', 'C3'],
'D': ['D0', 'D1', 'D2', 'D3']})
result = pd.merge(left, right, on='key')
print result
A B key C D
0 A0 B0 K0 C0 D0
1 A1 B1 K1 C1 D1
2 A2 B2 K2 C2 D2
3 A3 B3 K3 C3 D3
import dask.dataframe as dd
#Construct a dask objects from a pandas objects
left1 = dd.from_pandas(left, npartitions=3)
right1 = dd.from_pandas(right, npartitions=3)
#merge on key
print dd.merge(left1, right1, on='key').compute()
A B key C D
0 A3 B3 K3 C3 D3
1 A1 B1 K1 C1 D1
0 A2 B2 K2 C2 D2
1 A0 B0 K0 C0 D0
#first set indexes and then merge by them
print dd.merge(left1.set_index('key').compute(),
right1.set_index('key').compute(),
left_index=True,
right_index=True)
A B C D
key
K0 A0 B0 C0 D0
K1 A1 B1 C1 D1
K2 A2 B2 C2 D2
K3 A3 B3 C3 D3
You can improve the speed (by a factor of about 3 on the given example) of your merge by making the key
column the index of your dataframes and using join
instead.
left2 = left.set_index('key')
right2 = right.set_index('key')
In [46]: %timeit result2 = left2.join(right2)
1000 loops, best of 3: 361 µs per loop
In [47]: %timeit result = pd.merge(left, right, on='key')
1000 loops, best of 3: 1.01 ms per loop
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With