Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to synchronize Python processes on milliseconds level?

I'm trying to run two Python functions on two cores exactly at the same time. Each process runs a very long loop (theoretically infinite loop). It is important they stay synchronized at the same time, even the slightest delay can cause problems on the long run.

I think my problem is that I run them serially like this

# define the processes and assign them  functions
first_process = multiprocessing.Process(name='p1', target='first_function')
second_process = multiprocessing.Process(name='p2', target='second_function')

# start the processes
first_process.start()
second_process.start()

I printed time.time() at the start of each function to measure the time difference. Output came to be:

first function time: 1553812298.9244068
second function time: 1553812298.9254067

the difference is 0.0009999275207519531 seconds. As mentioned earlier, this difference will have significant impact on the long run.

To sum it up, how to run two functions on two different cores at exactly the same time? If Python isn't capable of doing that, what other options should I check out?

like image 616
YoZo Avatar asked Jan 02 '23 03:01

YoZo


2 Answers

What you ask for is not really what a usual OS is supposed to provide. You have OS-scheduling interfering, core-migrations, varying clock-speeds through cpu-thermodynamics, varying cache hits and misses and so on. It's possible to raise process-priorities and pin processes to certain cores (look into psutil for this), but it's unlikely you will be able to see stable improvements from doing so. Your OS usually does a better job than you could here.

For really hard real-time contraints you would have to look into RTOSes. Also, you would have to pick a middle-level language (e.g. C/C++), which allow fine grained memory management (reduce costly cpu-cache misses). It's possible you're asking for something you should do in a different way anyway (XY problem), so when I go on showing you how to get some synchronization, don't understand that as approval to your whole approach to whatever problem you're really trying to solve here.


The weapon of choice here is a multiprocessing.Barrier. That's a synchronization primitive which allows specifying a number of executors (threads/processes) which need to call .wait() on the barrier-instance. When the specified number of executors has called wait(), the barrier releases all waiting executors simultaneously. That way all executors can be synchronized per on such barrier-operation.

Note that one such operation is not enough for what you're asking for. The OS factors I mentioned earlier will always bring chaos back in and cpu-times will diverge from that point of synchronization again. This means you have to repeat the synchronization in certain intervals again and again. This will cost you some throughput of course. Shorter synchronization intervals mean less divergence on average.

Below you see two functions implementing that technique. syncstart_foo synchronizes only once (like @blhsing's answer), sync_foo does so every sync_interval iterations. When all iterations are made, the functions return time.time() to the parent, where the time-delta is calculated.

import time
from multiprocessing import Process, Barrier, Queue


def syncstart_foo(outqueue, barrier, n_iter):
    barrier.wait() # synchronize only once at start
    for _ in range(int(n_iter)):
        pass # do stuff
    outqueue.put(time.time())


def sync_foo(outqueue, barrier, n_iter, sync_interval):
    for i in range(int(n_iter)):
        if i % sync_interval == 0: # will sync first time for i==0
            barrier.wait()
        # do stuff
    outqueue.put(time.time())

Helper functions for running the benchmark:

def test_sync():
    """Run test for `sync_foo`."""
    special_args = (SYNC_INTERVAL,)
    _run_test(sync_foo, special_args)


def test_syncstart():
    """Run test for `syncstart_foo`."""
    _run_test(syncstart_foo)


def _run_test(f, special_args=None):

    outqueue = Queue()
    barrier = Barrier(N_WORKERS)

    args = (outqueue, barrier, N_ITER)
    if special_args:
        args += special_args

    pool = [Process(target=f, args=args) for _ in range(N_WORKERS)]

    print(f'starting test for {f.__name__}')
    for p in pool:
        p.start()

    results = [outqueue.get() for _ in range(N_WORKERS)]

    for p in pool:
        p.join()

    print(f"delta: {(abs(results[1] - results[0])) * 1e3:>{6}.{2}f} ms")
    print("-" * 60)

Main-Entry:

if __name__ == '__main__':

    N_WORKERS = 2
    N_ITER = 50e6  # 1e6 == 1M
    SYNC_INTERVAL = 250_000  # synchronize every x iterations

    for _ in range(5):
        test_syncstart()
        test_sync()

Example Output:

starting test for syncstart_foo
delta:  28.90 ms
------------------------------------------------------------
starting test for sync_foo
delta:   1.38 ms
------------------------------------------------------------
starting test for syncstart_foo
delta:  70.33 ms
------------------------------------------------------------
starting test for sync_foo
delta:   0.33 ms
------------------------------------------------------------
starting test for syncstart_foo
delta:   4.45 ms
------------------------------------------------------------
starting test for sync_foo
delta:   0.17 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 168.80 ms
------------------------------------------------------------
starting test for sync_foo
delta:   0.30 ms
------------------------------------------------------------
starting test for syncstart_foo
delta:  79.42 ms
------------------------------------------------------------
starting test for sync_foo
delta:   1.24 ms
------------------------------------------------------------

Process finished with exit code 0

You can see that synchronizing once, like syncstart_foo does, is not enough.

like image 149
Darkonaut Avatar answered Jan 13 '23 14:01

Darkonaut


You can designate a multiprocessing.Queue object for each of the processes, and at the start of the function for a process, put an item in the queue for the other process with multiprocessing.Queue.put and then immediately try to dequeue its own queue with multiprocessing.Queue.get. Since multiprocessing.Queue.get blocks until it there is an item in the queue, this effectively synchronizes the two processes:

import multiprocessing
import time

def func(queue_self, queue_other):
    queue_other.put(None)
    queue_self.get()
    print(time.time())

q1 = multiprocessing.Queue()
q2 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=func, args=(q1, q2))
p2 = multiprocessing.Process(target=func, args=(q2, q1))

if __name__ == '__main__':
    p1.start()
    p2.start()

Sample output:

1553814412.7520192
1553814412.7520192
like image 24
blhsing Avatar answered Jan 13 '23 16:01

blhsing