Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Multiprocessing Module: calling an instance method within a process

based on Python's multiprocessing module, I need to do the following:

-Create an ever running process, which can be interrupted by an specific Event.

-In this process, receive a message from a client, and pass this message to a handler method of the object instance.

The base code is below (some details omitted). The problem is that I try to call the instance method (self.enroll(message), but there is no effect, as expected. I know the reason - processes use their own memory and etc - and I have already implemented the solution presented in Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map() for solving the question of pickling bounded methods, as well as tried different approaches using Manager, Queue, Pool... since none worked, I decided to put the code as "raw" as possible, so that you can understand my intention. Any help is welcome.

class DistManager:
    def __init__(self, name, network_address, password):
        self.name = name
        self.network_address = network_address
        self.password = password
        self.distribution_clients = {}

    def _run_distribution_process(self):
        import select
        while not self.should_stop_distribution_service.is_set():
            (sread, swrite, sexc) = select.select([self.distribution_listener], [], [], 0)
            if (sread):
                connection = self.distribution_listener.accept()
                serialized_message = connection.recv()  # currently only receiving
                connection.close()
                message = pickle.loads(serialized_message)
                self.enroll(message)  # THE PROBLEM IS HERE

    def start_distribution_service(self, distribution_port):
        self.distribution_port = distribution_port
        # patch for making Listener work with select.select during run
        Listener.fileno = lambda self: self._listener._socket.fileno()
        self.distribution_listener = Listener(address=(self.network_address, self.distribution_port),
                                              authkey=self.password)
        self.should_stop_distribution_service = Event()
        self.distribution_process = Process(name='Distribution Runner', target=self._run_distribution_process)
        self.distribution_process.daemon = True
        self.distribution_process.start()

    def stop_distribution_service(self):
        from time import sleep
        self.should_stop_distribution_service.set()
        sleep(1)
        self.distribution_listener.close()
        self.distribution_process.terminate()
        return self.distribution_process.exitcode

    def _enroll_distribution_client(self, identifier, network_address, phone_number):
        self.distribution_clients[identifier] = (network_address, phone_number)

    def enroll(self, message):
        if type(message.content) is tuple:
            self._enroll_distribution_client(message.generator_identifier, message.content[0], message.content[1])
        else:
            raise TypeError("Tuple expected")
        return message.code
like image 696
Rogerio Atem Avatar asked Oct 31 '13 19:10

Rogerio Atem


1 Answers

You can still use multiprocessing.pool for this without pickling error.

Add the following line to a code that does multiprocessing with the class and you can still pass the method through the pool. the codes should go above the class

import copy_reg
    import types

    def _reduce_method(meth):
        return (getattr,(meth.__self__,meth.__func__.__name__))
    copy_reg.pickle(types.MethodType,_reduce_method)

for more understanding of how to pickle a method please see below http://docs.python.org/2/library/copy_reg.html

like image 111
Sri Avatar answered Sep 28 '22 10:09

Sri