Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does Python multiprocessing Queue messes up dictionaries?

I am trying to to create a multi-processes, multi-thread program in python. So far I have been successful, but I came across an issue that has been bugging me.

I have 3 classes. The main class is the Manager which creates one or more sub-processes (Subprocess class) and connects to each of these with a dedicated multiprocessing.Queue. Then, it sends these sub-processes commands through the queue to create socket-management threads (Server_Thread class). the configuration options for the Server_Thread is created at the Manager class and is passed to the subprocess through the queue in the form of a dictionary.

The code follows

import threading
import multiprocessing
import socket
import time


class Server_Thread(threading.Thread):
    def __init__(self, client_config):
        threading.Thread.__init__(self)
        self.address = client_config['address']
        self.port = client_config['port']

    def run(self):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        print "Binding to: local host, port = ", self.port 
        self.socket.bind((socket.gethostname(), self.port))
        self.socket.listen(1)

        self.running = True
        while self.running:    
            client_socket, client_address = self.socket.accept()
            # do stuff

    def stop(self):
        self.running = False


class Subprocess(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.server_thread_list = []

    def run(self):
        self.running = True
        while self.running:
            command = self.queue.get()
            if command[0] == "create_client":
                server_thread = Server_Thread(command[1])
                server_thread.start()
                self.server_thread_list.append(server_thread)
            elif command[0] == "terminate":
                self.running = False
        for server_thread in self.server_thread_list:
            server_thread.stop()
            server_thread.join()


class Manager:
    def __init__(self):
        self.client_config = {}     
        self.client_config['junk'] = range(10000)    # actually contains lots of stuff
        self.client_config['address'] = 'localhost'

    def run(self):
        current_bind_port = 40001
        self.queue = multiprocessing.Queue()
        subprocess = Subprocess(self.queue)
        subprocess.start()
        for i in range(20):
            print "creating socket thread at port =", current_bind_port
            self.client_config['port'] = current_bind_port
            self.queue.put(("create_client", self.client_config.copy()))    # pass a dictionary copy
            current_bind_port += 1
        time.sleep(10)
        self.queue.put(("terminate", None))
        subprocess.join()


if __name__ == "__main__":
    manager = Manager()
    manager.run()

The problem is that when I run this, sometimes it runs ok, but sometimes, the config dictionary gets messed up in the queue. I think it has something to do with the speed the queue is filled and the speed which is emptied, and I think it overflows without a warning.

The output with some restructuring (multiple processes mix things up with print)

>Python temp.py
creating socket thread at port = 40001
creating socket thread at port = 40002
creating socket thread at port = 40003
creating socket thread at port = 40004
creating socket thread at port = 40005
creating socket thread at port = 40006
creating socket thread at port = 40007
creating socket thread at port = 40008
creating socket thread at port = 40009
creating socket thread at port = 40010
creating socket thread at port = 40011
creating socket thread at port = 40012
creating socket thread at port = 40013
creating socket thread at port = 40014
creating socket thread at port = 40015
creating socket thread at port = 40016
creating socket thread at port = 40017
creating socket thread at port = 40018
creating socket thread at port = 40019
creating socket thread at port = 40020  << OK

Binding to: local host, port =  40001
Binding to: local host, port =  40020  << NOT OK from here
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020

Exception in thread Thread-4:
Traceback (most recent call last):
  File "C:\Python27\lib\threading.py", line 810, in __bootstrap_inner
    self.run()
  File "Y:\cStation\Python\iReact connection PoC\temp.py", line 18, in run
    self.socket.bind((socket.gethostname(), self.port))
  File "C:\Python27\lib\socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted

.... Get this message several more times ....

The issue seems to become less frequent (but does not disappear completely) if i insert a 'time.sleep(0.1)' command after putting each create_thread command to the queue.

What's interesting is that the tuple with the "create_thread" command are transferred without a problem, the issue seems to be the dictionary of values. Is there a way to ensure that the queue is OK for writing before putting values in it without the time.wait()? I have tried putting a while not self.queue.empty(): pass, but seems to stuck forever after a couple of commands...

like image 580
Vagos Duke Avatar asked Nov 22 '22 08:11

Vagos Duke


1 Answers

I have this problem with sending dictionaries that contains **big numpy arrays**. After a lot of try and test of different things, I came up with the following:

"Don't send huge or big objects through multiprocessing queues"

But there is some things you can do:

1- Create delay after sending huge objects and make sure queue pickled this huge object (or consumer received this message)

2- Copy your object and create a delay before sending another object through queue

3- For dictionaries make sure you don't change dict when sending dict through queue (use copy, delay, lock, etc.)

I hope it helps

However, further investigation is needed to clarify the root cause.
like image 146
mhs.rajaei Avatar answered Nov 24 '22 20:11

mhs.rajaei