Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does multiprocessing.Lock() not lock shared resource in Python?

Supposing I have a very big text file consisting of many lines that I would like to reverse. And I don't care of the final order. The input file contains Cyrillic symbols. I use multiprocessing to process on several cores.

I wrote such program:

# task.py

import multiprocessing as mp


POOL_NUMBER = 2


lock_read = mp.Lock()
lock_write = mp.Lock()

fi = open('input.txt', 'r')
fo = open('output.txt', 'w')

def handle(line):
    # In the future I want to do
    # some more complicated operations over the line
    return line.strip()[::-1]  # Reversing

def target():
    while True:
        try:
            with lock_read:
                line = next(fi)
        except StopIteration:
            break

        line = handle(line)

        with lock_write:
            print(line, file=fo)

pool = [mp.Process(target=target) for _ in range(POOL_NUMBER)]
for p in pool:
    p.start()
for p in pool:
    p.join()

fi.close()
fo.close()

This program fails with error:

Process Process-2:
Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "task.py", line 22, in target
    line = next(fi)
  File "/usr/lib/python3.5/codecs.py", line 321, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 0: invalid start byte
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "task.py", line 22, in target
    line = next(fi)
  File "/usr/lib/python3.5/codecs.py", line 321, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 0: invalid continuation byte

On the other hand, everything works fine if I set POOL_NUMBER = 1. But it doesn't make a sense if I want to gain the total performance.

Why does that error happen? And how can I fix it?

I use Python 3.5.2.

I generated data using this script:

# gen_file.py

from random import randint


LENGTH = 100
SIZE = 100000


def gen_word(length):
    return ''.join(
        chr(randint(ord('а'), ord('я')))
        for _ in range(length)
    )


if __name__ == "__main__":
    with open('input.txt', 'w') as f:
        for _ in range(SIZE):
            print(gen_word(LENGTH), file=f)
like image 951
Fomalhaut Avatar asked Nov 01 '17 13:11

Fomalhaut


People also ask

How does multiprocessing lock work in Python?

Python provides a mutual exclusion lock for use with processes via the multiprocessing. Lock class. An instance of the lock can be created and then acquired by processes before accessing a critical section, and released after the critical section. Only one process can have the lock at any time.

Does Python multiprocessing use shared memory?

multiprocessing is a drop in replacement for Python's multiprocessing module. It supports the exact same operations, but extends it, so that all tensors sent through a multiprocessing. Queue , will have their data moved into shared memory and will only send a handle to another process.

How use lock in multiprocessing?

How Can We Use a Multiprocessing Lock. Python provides a mutual exclusion lock via the threading. Lock class. An instance of the lock can be created and then acquired by threads before accessing a critical section, and released after the critical section.

Is multiprocessing value thread safe?

Value and multiprocessing.These shared objects will be process and thread-safe. This means that multiple processes may access and change the values of shared ctypes without fear of race conditions.


1 Answers

The issue here is reading a file from multi processes isn't working as you think, you can't share the open object between processes.

You could make a global current_line variable, and each time read the file and process the current line, not ideal.

Here is a different approach, using processes pool, and map method, I'm iterating over the file, and for each line I enqueue your target method:

from multiprocessing import Lock
from multiprocessing import Pool
import time
import os

POOL_NUMBER = 8

def target(line):
    # Really need some processing here
    for _ in range(2**10):
        pass
    return line[::-1]


pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0)  # Just to make sure we have plan new file
with open('input.txt', 'r') as fi:
    t0 = time.time()
    processed_lines = pool.map(target, fi.readlines())
    print('Total time', time.time() - t0)

    with open('output.txt', 'w') as fo:
        for processed_line in processed_lines:
            fo.writelines(processed_line)

With 8 process on my machine: Total time 1.3367934226989746

And with 1 process: Total time 4.324501991271973

This works best if your target function is CPU bound, a different approach would be to split the file into POOL_NUMBER chunks and make each process write a processed chunk of data(with lock!) to the output file.

Another approach, is to create a master process that does the write job for the rest of the processes, here is an example.

EDIT

After you comment i figured you can't fit the file into memory. For this, you can just iterate over the file object which will read line by line into memory. But than we need to modify the code a little big:

POOL_NUMBER = 8
CHUNK_SIZE = 50000

def target(line):
    # This is not a measurable task, since most of the time wil spent on writing the data
    # if you have a CPU bound task, this code will make sense
    return line[::-1]


pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0)  # Just to make sure we have plan new file
processed_lines = []

with open('input.txt', 'r') as fi:
    t0 = time.time()
    for line in fi:
        processed_lines.append(pool.apply_async(target, (line,)))  # Keep a refernce to this task, but don't 

        if len(processed_lines) == CHUNK_SIZE:
            with open('output.txt', 'w') as fo:  # reading the file line by line
                for processed_line in processed_lines:
                    fo.writelines(processed_line.get())
            processed_lines = []  # truncate the result list, and let the garbage collector collect the unused memory, if we don't clear the list we will ran out of memory!
    print('Total time', time.time() - t0)

Keep in mind that you can play with the CHUNK_SIZE variable to control how much memory you use. For me 5000 is about 10K max for each process.

P.S

I think it would be best the split the big file into smaller files, this way you solve the read/write lock on the file, and also make it scalable to process(even on a different machine!)

like image 90
Or Duan Avatar answered Sep 28 '22 13:09

Or Duan