Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing: Start/stop processes on server

I am building an algorithmic trading platform using Python. Multiple algorithms are monitoring the market and execute trades accordingly daily from 09:30 to 16:00.

What I'm looking for is to start and stop algorithms arbitrarily from a client. Therefore I want to have a server script running using multiprocessing and a client which can start/stop/list algorithms (which should run in separate process) at any given time.

Any examples of how this can be done? The majority of online examples are for queue servers, which do not seem to fit my problem.

EDIT:

I am trying to to this with the package multiprocessing. The idea of using a queue seems wrong to me, as I know an arbitrary number of processes will for a fact run for the whole day or at least until I say stop. I'm not trying to run a short script and let a worker consume the next job from a queue once the previous is done. Actually I'm thinking of having a server script using a Manager which will run forever and just start new scripts in separate processes/threads when requested. I would however, like to be able to send a stop signal to a process to kill it. I do have a feeling that I'm doing this kinda backwards :-) What I have is:

server.py:

import multiprocessing as mp
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from time import strftime


class Server(object):
    def __init__(self, port=50000, authkey=''):
        self.processes = {}
        self._authkey = authkey
        self.port = port
        self.server = None
        self.running = False
        BaseManager.register('get_process', callable=lambda: self)


    def start_server(self):
        manager = BaseManager(address=('', self.port), authkey=self._authkey)
        self.server = manager.get_server()
        try:
            self._logmessage("Server started")
            self.running = True
            self.server.serve_forever()
        except (KeyboardInterrupt, SystemExit):
            self.shutdown()

    def start_process(self, mod, fcn, *args, **kwargs):
        mod = __import__(mod, globals(), locals(), ['object'], -1)
        key = "{0}.{1}".format(mod, fcn)
        assert not key in self.processes, \
            "Process named '%s' already exists" % key
        p = Process(target=getattr(mod, fcn), name=mod, args=(None, ), kwargs=kwargs)
        self._logmessage("Process '%s' started" % key)
        p.start()
        # p.join()
        self.processes[key] = p

    def stop_process(self, key):
        self.processes[key].terminate()
        del self.processes[key]

    def get_processes(self):
        return self.processes.keys()

    def shutdown(self):
        for child in mp.active_children():
            child.terminate()
        self.server.shutdown()
        self.running = False
        print "Shutting down"

    def _logmessage(self, msg):
        print "%s: %s" % (strftime('%Y-%m-%d %H:%M:%S'), msg)


if __name__ == '__main__':
    server = Server(authkey='abc')
    try:
        server.start_server()
    except (KeyboardInterrupt, SystemExit):
        server.shutdown()

client.py:

from multiprocessing.managers import BaseManager
import time


class Client(object):
    def __init__(self, host='', port=50000, authkey=''):
        self.host = host
        self.port = port
        self.manager = None
        self.process = None
        self._type_id = 'get_process'
        self._authkey = authkey
        self.manager = BaseManager(address=(self.host, self.port), authkey=self._authkey)
        BaseManager.register(self._type_id)

    def connect(self):
        try:
            self.manager.connect()
            self._logmessage("Connected to server")
        except:
            self._logmessage("Could not connect to server")
        self.process = getattr(self.manager, self._type_id)()

    def start_process(self, mod, fcn):
        self.process.start_process(mod, fcn)
        self._logmessage("Process '%s' started" % fcn)

    def list_processes(self):
        print self.process.get_processes()

    @property
    def connected(self):
        return self.manager._state.value == self.manager._state.STARTED

    def _logmessage(self, msg):
        print "%s: %s" % (time.strftime('%Y-%m-%d %H:%M:%S'), msg)


def test(data):
    while True:
        print time.time()
        time.sleep(1.)


if __name__ == '__main__':
    from algotrading.server.process_client import Client
    client = Client(authkey='abc')
    client.connect()
    client.start_process("algotrading.server.process_client", "test")
    client.list_processes()
like image 229
Morten Avatar asked Nov 23 '22 06:11

Morten


1 Answers

Check out Supervisord which allows for remote management of processes, plus automatic start/restart configurability.

Depending on your scalability and disaster-recovery needs, you may be thinking about distributing your "monitoring/trading processes" across running multiple servers. While supervisord is really only designed to manage a single machine, you could build a manager app which coordinates multiple servers, each running supervisord, via it's xml-rpc interface.

Cron or Celery could be used for your daily start/stop scheduling.

like image 102
Dwight Gunning Avatar answered Nov 25 '22 20:11

Dwight Gunning