Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I share a class between processes?

I want to have global object which is shared and updated by all processes with minimum locking.

import multiprocessing

class Counter(object):
  def __init__(self):
    self.value = 0

  def update(self, value):
    self.value += value


def update(counter_proxy, thread_id):
  counter_proxy.value.update(1)
  print counter_proxy.value.value, 't%s' % thread_id, \
    multiprocessing.current_process().name
  return counter_proxy.value.value

def main():
  manager = multiprocessing.Manager()
  counter = manager.Value(Counter, Counter())
  pool = multiprocessing.Pool(multiprocessing.cpu_count())
  for i in range(10):
    pool.apply(func = update, args = (counter, i))
  pool.close()
  pool.join()

  print 'Should be 10 but is %s.' % counter.value.value

if __name__ == '__main__':
  main()

The result is this - not 10 but zero. It looks like the object's shared value is not updated. How can I lock and update such value?

0 t0 PoolWorker-2
0 t1 PoolWorker-3
0 t2 PoolWorker-5
0 t3 PoolWorker-8
0 t4 PoolWorker-9
0 t5 PoolWorker-2
0 t6 PoolWorker-7
0 t7 PoolWorker-4
0 t8 PoolWorker-6
0 t9 PoolWorker-3
Should be 10 but is 0.

Current the best solution by @dano - I mixed custom manager with class proxy.

import multiprocessing
from multiprocessing.managers import BaseManager, NamespaceProxy


class Counter(object):
  def __init__(self):
    self.value = 0

  def update(self, value):
    self.value += value


def update(counter_proxy, thread_id):
  counter_proxy.update(1)

class CounterManager(BaseManager):
  pass

class CounterProxy(NamespaceProxy):
  _exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'update')

  def update(self, value):
    callmethod = object.__getattribute__(self, '_callmethod')
    return callmethod(self.update.__name__, (value,))

CounterManager.register('Counter', Counter, CounterProxy)

def main():
  manager = CounterManager()
  manager.start()

  counter = manager.Counter()
  pool = multiprocessing.Pool(multiprocessing.cpu_count())
  for i in range(10):
    pool.apply(func = update, args = (counter, i))
  pool.close()
  pool.join()

  print 'Should be 10 but is %s.' % counter.value

if __name__ == '__main__':
  main()
like image 906
Chameleon Avatar asked Feb 19 '15 17:02

Chameleon


1 Answers

multiprocessing.Value isn't designed to be used with custom classes, it's supposed to be similar to a multiprocessing.sharedctypes.Value. Instead, you need to create a custom manager and register your class with it. Your life will also be easier if you don't access value directly, but modify/access it via methods, which will get exported by the default Proxy created for your class by default. Regular attributes (like Counter.value) aren't, so they aren't accessible without additional customization. Here's a working example:

import multiprocessing
from multiprocessing.managers import BaseManager

class MyManager(BaseManager): pass

def Manager():
    m = MyManager()
    m.start()
    return m 

class Counter(object):
  def __init__(self):
    self._value = 0

  def update(self, value):
    self._value += value

  def get_value(self):
      return self._value

MyManager.register('Counter', Counter)

def update(counter_proxy, thread_id):
  counter_proxy.update(1)
  print counter_proxy.get_value(), 't%s' % thread_id, \
    multiprocessing.current_process().name
  return counter_proxy

def main():
  manager = Manager()
  counter = manager.Counter()
  pool = multiprocessing.Pool(multiprocessing.cpu_count())
  for i in range(10):
    pool.apply(func=update, args=(counter, i))
  pool.close()
  pool.join()

  print 'Should be 10 but is %s.' % counter.get_value()

if __name__ == '__main__':
  main()

Output:

1 t0 PoolWorker-2
2 t1 PoolWorker-8
3 t2 PoolWorker-4
4 t3 PoolWorker-5
5 t4 PoolWorker-6
6 t5 PoolWorker-7
7 t6 PoolWorker-3
8 t7 PoolWorker-9
9 t8 PoolWorker-2
10 t9 PoolWorker-8
Should be 10 but is 10.
like image 138
dano Avatar answered Oct 24 '22 04:10

dano