Supposing I have a very big text file consisting of many lines that I would like to reverse. And I don't care of the final order. The input file contains Cyrillic symbols. I use multiprocessing
to process on several cores.
I wrote such program:
# task.py
import multiprocessing as mp
POOL_NUMBER = 2
lock_read = mp.Lock()
lock_write = mp.Lock()
fi = open('input.txt', 'r')
fo = open('output.txt', 'w')
def handle(line):
# In the future I want to do
# some more complicated operations over the line
return line.strip()[::-1] # Reversing
def target():
while True:
try:
with lock_read:
line = next(fi)
except StopIteration:
break
line = handle(line)
with lock_write:
print(line, file=fo)
pool = [mp.Process(target=target) for _ in range(POOL_NUMBER)]
for p in pool:
p.start()
for p in pool:
p.join()
fi.close()
fo.close()
This program fails with error:
Process Process-2:
Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "task.py", line 22, in target
line = next(fi)
File "/usr/lib/python3.5/codecs.py", line 321, in decode
(result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 0: invalid start byte
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "task.py", line 22, in target
line = next(fi)
File "/usr/lib/python3.5/codecs.py", line 321, in decode
(result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 0: invalid continuation byte
On the other hand, everything works fine if I set POOL_NUMBER = 1
. But it doesn't make a sense if I want to gain the total performance.
Why does that error happen? And how can I fix it?
I use Python 3.5.2
.
I generated data using this script:
# gen_file.py
from random import randint
LENGTH = 100
SIZE = 100000
def gen_word(length):
return ''.join(
chr(randint(ord('а'), ord('я')))
for _ in range(length)
)
if __name__ == "__main__":
with open('input.txt', 'w') as f:
for _ in range(SIZE):
print(gen_word(LENGTH), file=f)
Python provides a mutual exclusion lock for use with processes via the multiprocessing. Lock class. An instance of the lock can be created and then acquired by processes before accessing a critical section, and released after the critical section. Only one process can have the lock at any time.
multiprocessing is a drop in replacement for Python's multiprocessing module. It supports the exact same operations, but extends it, so that all tensors sent through a multiprocessing. Queue , will have their data moved into shared memory and will only send a handle to another process.
How Can We Use a Multiprocessing Lock. Python provides a mutual exclusion lock via the threading. Lock class. An instance of the lock can be created and then acquired by threads before accessing a critical section, and released after the critical section.
Value and multiprocessing.These shared objects will be process and thread-safe. This means that multiple processes may access and change the values of shared ctypes without fear of race conditions.
The issue here is reading a file from multi processes isn't working as you think, you can't share the open
object between processes.
You could make a global current_line
variable, and each time read the file and process the current line, not ideal.
Here is a different approach, using processes pool, and map
method, I'm iterating over the file, and for each line I enqueue your target method:
from multiprocessing import Lock
from multiprocessing import Pool
import time
import os
POOL_NUMBER = 8
def target(line):
# Really need some processing here
for _ in range(2**10):
pass
return line[::-1]
pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0) # Just to make sure we have plan new file
with open('input.txt', 'r') as fi:
t0 = time.time()
processed_lines = pool.map(target, fi.readlines())
print('Total time', time.time() - t0)
with open('output.txt', 'w') as fo:
for processed_line in processed_lines:
fo.writelines(processed_line)
With 8 process on my machine:
Total time 1.3367934226989746
And with 1 process:
Total time 4.324501991271973
This works best if your target function is CPU bound, a different approach would be to split the file into POOL_NUMBER
chunks and make each process write a processed chunk of data(with lock!) to the output file.
Another approach, is to create a master process that does the write job for the rest of the processes, here is an example.
EDIT
After you comment i figured you can't fit the file into memory. For this, you can just iterate over the file object which will read line by line into memory. But than we need to modify the code a little big:
POOL_NUMBER = 8
CHUNK_SIZE = 50000
def target(line):
# This is not a measurable task, since most of the time wil spent on writing the data
# if you have a CPU bound task, this code will make sense
return line[::-1]
pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0) # Just to make sure we have plan new file
processed_lines = []
with open('input.txt', 'r') as fi:
t0 = time.time()
for line in fi:
processed_lines.append(pool.apply_async(target, (line,))) # Keep a refernce to this task, but don't
if len(processed_lines) == CHUNK_SIZE:
with open('output.txt', 'w') as fo: # reading the file line by line
for processed_line in processed_lines:
fo.writelines(processed_line.get())
processed_lines = [] # truncate the result list, and let the garbage collector collect the unused memory, if we don't clear the list we will ran out of memory!
print('Total time', time.time() - t0)
Keep in mind that you can play with the CHUNK_SIZE
variable to control how much memory you use. For me 5000 is about 10K max for each process.
P.S
I think it would be best the split the big file into smaller files, this way you solve the read/write lock on the file, and also make it scalable to process(even on a different machine!)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With