Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I add a timeout to multiprocessing.connection.Client(..) in Python 3.7?

I've got two Python programs running. Program A connects to program B with the multiprocessing module:

# Connection code in program A
# -----------------------------
import multiprocessing
import multiprocessing.connection

...

connection = multiprocessing.connection.Client(
('localhost', 19191),                # <- address of program B
authkey='embeetle'.encode('utf-8')   # <- authorization key
)

...

connection.send(send_data)

recv_data = connection.recv()

It works perfectly most of the time. However, sometimes program B is frozen (the details don't matter much, but it usually happens when the GUI from program B spawns a modal window).
While program B is frozen, program A hangs at the following line:

connection = multiprocessing.connection.Client(
('localhost', 19191),                # <- address of program B
authkey='embeetle'.encode('utf-8')   # <- authorization key
)

It keeps waiting for a response. I would like to put a timeout parameter, but the call to multiprocessing.connection.Client(..) does not have one.

How can I implement a timeout here?

 
Notes:
I'm working on a Windows 10 computer with Python 3.7.

like image 480
K.Mulier Avatar asked Mar 04 '23 11:03

K.Mulier


1 Answers

I would like to put a timeout parameter, but the call to multiprocessing.connection.Client(..) does not have one. How can I implement a timeout here?

Looking at the source to multiprocessing.connection in Python 3.7, the Client() function is a fairly brief wrapper around SocketClient() for your use case, which in turn wraps Connection().

At first it looked fairly straightforward to write a ClientWithTimeout wrapper that does the same thing, but additionally calls settimeout() on the socket it creates for the connection. However, this does not have the correct effect, because:

  1. Python implements its own socket timeout behaviour by using select() and an underlying non-blocking OS socket; this behaviour is what is configured by settimeout().

  2. Connection operates directly on an OS socket handle, which is returned by calling detach() on the normal Python socket object.

  3. Since Python has set the OS socket handle to the non-blocking mode, recv() calls on it return immediately rather than waiting for the timeout period.

However, we can still set a receive timeout on the underlying OS socket handle by using the low-level SO_RCVTIMEO socket option.

Hence the second version of my solution:

from multiprocessing.connection import Connection, answer_challenge, deliver_challenge
import socket, struct

def ClientWithTimeout(address, authkey, timeout):

    with socket.socket(socket.AF_INET) as s:
        s.setblocking(True)
        s.connect(address)

        # We'd like to call s.settimeout(timeout) here, but that won't work.

        # Instead, prepare a C "struct timeval" to specify timeout. Note that
        # these field sizes may differ by platform.
        seconds = int(timeout)
        microseconds = int((timeout - seconds) * 1e6)
        timeval = struct.pack("@LL", seconds, microseconds)

        # And then set the SO_RCVTIMEO (receive timeout) option with this.
        s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeval)

        # Now create the connection as normal.
        c = Connection(s.detach())

    # The following code will now fail if a socket timeout occurs.

    answer_challenge(c, authkey)
    deliver_challenge(c, authkey)

    return c

For brevity, I have assumed the parameters are as per your example, i.e.:

  • address is a tuple (implying address family is AF_INET).
  • authkey is a byte string.

If you need to handle cases where these assumptions don't hold then you will need to copy a little more logic from Client() and SocketClient().

Although I looked at the multiprocessing.connection source to find out how to do this, my solution does not use any private implementation details. Connection, answer_challenge and deliver_challenge are all public and documented parts of the API. This function should therefore be be safe to use with future versions of multiprocessing.connection.

Note that SO_RCVTIMEO may not be supported on all platforms, but it is present on at least Windows, Linux and OSX. The format of struct timeval is also platform-specific. I have assumed that the two fields are always of the native unsigned long type. I think this should be correct on common platforms but it is not guaranteed to always be so. Unfortunately Python does not currently provide a platform-independent way to do this.

Below is a test program which shows this working - it assumes the above code is saved as client_timeout.py.

from multiprocessing.connection import Client, Listener
from client_timeout import ClientWithTimeout
from threading import Thread
from time import time, sleep

addr = ('localhost', 19191)
key = 'embeetle'.encode('utf-8')

# Provide a listener which either does or doesn't accept connections.
class ListenerThread(Thread):

    def __init__(self, accept):
        Thread.__init__(self)
        self.accept = accept

    def __enter__(self):
        if self.accept:
            print("Starting listener, accepting connections")
        else:
            print("Starting listener, not accepting connections")
        self.active = True 
        self.start()
        sleep(0.1)

    def run(self):
        listener = Listener(addr, authkey=key)
        self.active = True
        if self.accept:
            listener.accept()
        while self.active:
            sleep(0.1)
        listener.close()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.active = False
        self.join()
        print("Stopped listener")
        return True

for description, accept, name, function in [
        ("ClientWithTimeout succeeds when the listener accepts connections.",
        True, "ClientWithTimeout", lambda: ClientWithTimeout(addr, timeout=3, authkey=key)),
        ("ClientWithTimeout fails after 3s when listener doesn't accept connections.",
        False, "ClientWithTimeout", lambda: ClientWithTimeout(addr, timeout=3, authkey=key)),
        ("Client succeeds when the listener accepts connections.",
        True, "Client", lambda: Client(addr, authkey=key)),
        ("Client hangs when the listener doesn't accept connections (use ctrl-C to stop).",
        False, "Client", lambda: Client(addr, authkey=key))]:

    print("Expected result:", description)

    with ListenerThread(accept):
        start_time = time()
        try:
            print("Creating connection using %s... " % name)
            client = function()
            print("Client created:", client)
        except Exception as e:
            print("Failed:", e)
        print("Time elapsed: %f seconds" % (time() - start_time))

    print()

Running this on Linux produces the following output:

Expected result: ClientWithTimeout succeeds when the listener accepts connections.
Starting listener, accepting connections
Creating connection using ClientWithTimeout... 
Client created: <multiprocessing.connection.Connection object at 0x7fad536884e0>
Time elapsed: 0.003276 seconds
Stopped listener

Expected result: ClientWithTimeout fails after 3s when listener doesn't accept connections.
Starting listener, not accepting connections
Creating connection using ClientWithTimeout... 
Failed: [Errno 11] Resource temporarily unavailable
Time elapsed: 3.157268 seconds
Stopped listener

Expected result: Client succeeds when the listener accepts connections.
Starting listener, accepting connections
Creating connection using Client... 
Client created: <multiprocessing.connection.Connection object at 0x7fad53688c50>
Time elapsed: 0.001957 seconds
Stopped listener

Expected result: Client hangs when the listener doesn't accept connections (use ctrl-C to stop).
Starting listener, not accepting connections
Creating connection using Client... 
^C
Stopped listener
like image 73
Martin L Avatar answered Apr 28 '23 00:04

Martin L