Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I abort a socket.recvfrom() from another thread in python?

This looks like a duplicate of How do I abort a socket.recv() from another thread in Python, but it's not, since I want to abort recvfrom() in a thread, which is UDP, not TCP.

Can this be solved by poll() or select.select() ?

like image 888
jokoon Avatar asked Sep 16 '11 19:09

jokoon


People also ask

What is the use of socket Recvfrom () method?

The recvfrom() function receives data on a socket named by descriptor socket and stores it in a buffer. The recvfrom() function applies to any datagram socket, whether connected or unconnected. The socket descriptor. The pointer to the buffer that receives the data.

Do you need to close socket Python?

It is only ever necessary to shutdown a socket for writing if (1) you have forked the process and definitely want to send the FIN now, or (2) you are engaging in a mutual read-to-EOS protocol such that both peers close at the same time. Otherwise close() is sufficient. The Python documentation should be corrected.

Can a socket send and receive at the same time Python?

You can send and receive on the same socket at the same time (via multiple threads). But the send and receive may not actually occur simultaneously, since one operation may block the other from starting until it's done.


2 Answers

If you want to unblock a UDP read from another thread, send it a datagram!

Rgds, Martin

like image 58
Martin James Avatar answered Oct 13 '22 01:10

Martin James


A good way to handle this kind of asynchronous interruption is the old C pipe trick. You can create a pipe and use select/poll on both socket and pipe: Now when you want interrupt receiver you can just send a char to the pipe.

  • pros:
    • Can work both for UDP and TCP
    • Is protocol agnostic
  • cons:
    • select/poll on pipes are not available on Windows, in this case you should replace it by another UDP socket that use as notification pipe

Starting point

interruptable_socket.py

import os
import socket
import select


class InterruptableUdpSocketReceiver(object):
    def __init__(self, host, port):
        self._host = host
        self._port = port
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._r_pipe, self._w_pipe = os.pipe()
        self._interrupted = False

    def bind(self):
        self._socket.bind((self._host, self._port))

    def recv(self, buffersize, flags=0):
        if self._interrupted:
            raise RuntimeError("Cannot be reused")
        read, _w, errors = select.select([self._r_pipe, self._socket], [], [self._socket])
        if self._socket in read:
            return self._socket.recv(buffersize, flags)
        return ""

    def interrupt(self):
        self._interrupted = True
        os.write(self._w_pipe, "I".encode())

A test suite:

test_interruptable_socket.py

import socket
from threading import Timer
import time
from interruptable_socket import InterruptableUdpSocketReceiver
import unittest


class Sender(object):
    def __init__(self, destination_host, destination_port):
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        self._dest = (destination_host, destination_port)

    def send(self, message):
        self._socket.sendto(message, self._dest)

class Test(unittest.TestCase):
    def create_receiver(self, host="127.0.0.1", port=3010):
        receiver = InterruptableUdpSocketReceiver(host, port)
        receiver.bind()
        return receiver

    def create_sender(self, host="127.0.0.1", port=3010):
        return Sender(host, port)

    def create_sender_receiver(self, host="127.0.0.1", port=3010):
        return self.create_sender(host, port), self.create_receiver(host, port)

    def test_create(self):
        self.create_receiver()

    def test_recv_async(self):
        sender, receiver = self.create_sender_receiver()
        start = time.time()
        send_message = "TEST".encode('UTF-8')
        Timer(0.1, sender.send, (send_message, )).start()
        message = receiver.recv(128)
        elapsed = time.time()-start
        self.assertGreaterEqual(elapsed, 0.095)
        self.assertLess(elapsed, 0.11)
        self.assertEqual(message, send_message)

    def test_interrupt_async(self):
        receiver = self.create_receiver()
        start = time.time()
        Timer(0.1, receiver.interrupt).start()
        message = receiver.recv(128)
        elapsed = time.time()-start
        self.assertGreaterEqual(elapsed, 0.095)
        self.assertLess(elapsed, 0.11)
        self.assertEqual(0, len(message))

    def test_exception_after_interrupt(self):
        sender, receiver = self.create_sender_receiver()
        receiver.interrupt()
        with self.assertRaises(RuntimeError):
            receiver.recv(128)


if __name__ == '__main__':
    unittest.main()

Evolution

Now this code is just a starting point. To make it more generic I see we should fix follow issues:

  1. Interface: return empty message in interrupt case is not a good deal, is better to use an exception to handle it
  2. Generalization: we should have just a function to call before socket.recv(), extend interrupt to others recv methods become very simple
  3. Portability: to make simple port it to windows we should isolate the async notification in a object to choose the right implementation for our operating system

First of all we change test_interrupt_async() to check exception instead empty message:

from interruptable_socket import InterruptException

def test_interrupt_async(self):
    receiver = self.create_receiver()
    start = time.time()
    with self.assertRaises(InterruptException):
        Timer(0.1, receiver.interrupt).start()
        receiver.recv(128)
    elapsed = time.time()-start
    self.assertGreaterEqual(elapsed, 0.095)
    self.assertLess(elapsed, 0.11)

After this we can replace return '' by raise InterruptException and the tests pass again.

The ready to extend version can be :

interruptable_socket.py

import os
import socket
import select


class InterruptException(Exception):
    pass


class InterruptableUdpSocketReceiver(object):
    def __init__(self, host, port):
        self._host = host
        self._port = port
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._async_interrupt = AsycInterrupt(self._socket)

    def bind(self):
        self._socket.bind((self._host, self._port))

    def recv(self, buffersize, flags=0):
        self._async_interrupt.wait_for_receive()
        return self._socket.recv(buffersize, flags)

    def interrupt(self):
        self._async_interrupt.interrupt()


class AsycInterrupt(object):
    def __init__(self, descriptor):
        self._read, self._write = os.pipe()
        self._interrupted = False
        self._descriptor = descriptor

    def interrupt(self):
        self._interrupted = True
        self._notify()

    def wait_for_receive(self):
        if self._interrupted:
            raise RuntimeError("Cannot be reused")
        read, _w, errors = select.select([self._read, self._descriptor], [], [self._descriptor])
        if self._descriptor not in read:
            raise InterruptException

    def _notify(self):
        os.write(self._write, "I".encode())

Now wraps more recv function, implement a windows version or take care of socket timeouts become really simple.

like image 35
Michele d'Amico Avatar answered Oct 13 '22 00:10

Michele d'Amico