I have 5,000,000 rows in my dataframe. In my code, I am using iterrows() which is taking too much time. To get the required output, I have to iterate through all the rows . So I wanted to know whether I can parallelize the code in pandas.
A Dask DataFrame consists of multiple pandas Dataframes, and each pandas dataframe is called a partition. This mechanism allows you to work with larger-than-memory data because your computations are distributed across these pandas dataframes and can be executed in parallel.
There are several common ways to parallelize Python code. You can launch several application instances or a script to perform jobs in parallel. This approach is great when you don't need to exchange data between parallel jobs.
One way to achieve parallelism in Python is by using the multiprocessing module. The multiprocessing module allows you to create multiple processes, each of them with its own Python interpreter. For this reason, Python multiprocessing accomplishes process-based parallelism.
Both multithreading and multiprocessing allow Python code to run concurrently. Only multiprocessing will allow your code to be truly parallel. However, if your code is IO-heavy (like HTTP requests), then multithreading will still probably speed up your code.
Here's a webpage I found that might help: http://gouthamanbalaraman.com/blog/distributed-processing-pandas.html
And here's the code for multiprocessing found in that page:
import pandas as pd
import multiprocessing as mp
LARGE_FILE = "D:\\my_large_file.txt"
CHUNKSIZE = 100000 # processing 100,000 rows at a time
def process_frame(df):
# process data frame
return len(df)
if __name__ == '__main__':
reader = pd.read_table(LARGE_FILE, chunksize=CHUNKSIZE)
pool = mp.Pool(4) # use 4 processes
funclist = []
for df in reader:
# process each data frame
f = pool.apply_async(process_frame,[df])
funclist.append(f)
result = 0
for f in funclist:
result += f.get(timeout=10) # timeout in 10 seconds
print "There are %d rows of data"%(result)
This code shows how you might break up a large dataframe into smaller dataframes each with a number of rows equal to N_ROWS
(except possibly for the last dataframe) and then pass each dataframe to a process pool (of whatever size you want, but there is no point in using anything larger than the number of processors you have). Each worker process returns the modified dataframe back to the main process which then reassembles the final result dataframe by concatenating the return values from the worker processes:
import pandas as pd
import multiprocessing as mp
def process_frame(df):
# process data frame
# create new index starting at 0:
df.reset_index(inplace=True, drop=True)
# increment everybody's age:
for i in range(len(df.index)):
df.at[i, 'Age'] += 1
return df
def divide_and_conquer(df):
N_ROWS = 2 # number of rows in each dataframe
with mp.Pool(3) as pool: # use 3 processes
# break up dataframe into smaller daraframes of N_ROWS rows each
cnt = len(df.index)
n, remainder = divmod(cnt, N_ROWS)
results = []
start_index = 0
for i in range(n):
results.append(pool.apply_async(process_frame, args=(df.loc[start_index:start_index+N_ROWS-1, :],)))
start_index += N_ROWS
if remainder:
results.append(pool.apply_async(process_frame, args=(df.loc[start_index:start_index+remainder-1, :],)))
new_dfs = [result.get() for result in results]
# reassemble final dataframe:
df = pd.concat(new_dfs, ignore_index=True)
return df
if __name__ == '__main__':
df = pd.DataFrame({
"Name": ['Tom', 'Dick', 'Harry', 'Jane', 'June', 'Sally', 'Mary'],
"Age": [10, 20, 30, 40, 40, 60, 70],
"Sex": ['M', 'M', 'M', 'F', 'F', 'F', 'F']
})
print(df)
df = divide_and_conquer(df)
print(df)
Prints:
Name Age Sex
0 Tom 10 M
1 Dick 20 M
2 Harry 30 M
3 Jane 40 F
4 June 40 F
5 Sally 60 F
6 Mary 70 F
Name Age Sex
0 Tom 11 M
1 Dick 21 M
2 Harry 31 M
3 Jane 41 F
4 June 41 F
5 Sally 61 F
6 Mary 71 F
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With