如何从 python 中的另一个线程中止 socket.recvfrom() ?

发布于 2024-12-04 18:03:45 字数 256 浏览 0 评论 0原文

这看起来像 How do我从Python中的另一个线程中中止socket.recv(),但事实并非如此,因为我想中止线程中的recvfrom(),该线程是UDP,而不是TCP。

这可以通过 poll() 或 select.select() 解决吗?

This looks like a duplicate of How do I abort a socket.recv() from another thread in Python, but it's not, since I want to abort recvfrom() in a thread, which is UDP, not TCP.

Can this be solved by poll() or select.select() ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

浪推晚风 2024-12-11 18:03:45

如果您想解锁从另一个线程读取 UDP,请向其发送数据报!

平均值,
马丁

If you want to unblock a UDP read from another thread, send it a datagram!

Rgds,
Martin

温柔少女心 2024-12-11 18:03:45

处理这种异步中断的一个好方法是旧的 C 管道技巧。您可以创建一个管道并在套接字和管道上使用select/poll:现在,当您需要中断接收器时,您只需向管道发送一个字符即可。

  • 优点:
    • 可同时适用于 UDP 和 TCP
    • 与协议无关
  • 缺点:
    • 管道上的选择/轮询在 Windows 上不可用,在这种情况下,您应该将其替换为用作通知管道的另一个 UDP 套接字

起始点

interruptable_socket.py

import os
import socket
import select


class InterruptableUdpSocketReceiver(object):
    def __init__(self, host, port):
        self._host = host
        self._port = port
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._r_pipe, self._w_pipe = os.pipe()
        self._interrupted = False

    def bind(self):
        self._socket.bind((self._host, self._port))

    def recv(self, buffersize, flags=0):
        if self._interrupted:
            raise RuntimeError("Cannot be reused")
        read, _w, errors = select.select([self._r_pipe, self._socket], [], [self._socket])
        if self._socket in read:
            return self._socket.recv(buffersize, flags)
        return ""

    def interrupt(self):
        self._interrupted = True
        os.write(self._w_pipe, "I".encode())

测试套件:

test_interruptable_socket.py

import socket
from threading import Timer
import time
from interruptable_socket import InterruptableUdpSocketReceiver
import unittest


class Sender(object):
    def __init__(self, destination_host, destination_port):
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        self._dest = (destination_host, destination_port)

    def send(self, message):
        self._socket.sendto(message, self._dest)

class Test(unittest.TestCase):
    def create_receiver(self, host="127.0.0.1", port=3010):
        receiver = InterruptableUdpSocketReceiver(host, port)
        receiver.bind()
        return receiver

    def create_sender(self, host="127.0.0.1", port=3010):
        return Sender(host, port)

    def create_sender_receiver(self, host="127.0.0.1", port=3010):
        return self.create_sender(host, port), self.create_receiver(host, port)

    def test_create(self):
        self.create_receiver()

    def test_recv_async(self):
        sender, receiver = self.create_sender_receiver()
        start = time.time()
        send_message = "TEST".encode('UTF-8')
        Timer(0.1, sender.send, (send_message, )).start()
        message = receiver.recv(128)
        elapsed = time.time()-start
        self.assertGreaterEqual(elapsed, 0.095)
        self.assertLess(elapsed, 0.11)
        self.assertEqual(message, send_message)

    def test_interrupt_async(self):
        receiver = self.create_receiver()
        start = time.time()
        Timer(0.1, receiver.interrupt).start()
        message = receiver.recv(128)
        elapsed = time.time()-start
        self.assertGreaterEqual(elapsed, 0.095)
        self.assertLess(elapsed, 0.11)
        self.assertEqual(0, len(message))

    def test_exception_after_interrupt(self):
        sender, receiver = self.create_sender_receiver()
        receiver.interrupt()
        with self.assertRaises(RuntimeError):
            receiver.recv(128)


if __name__ == '__main__':
    unittest.main()

演变

现在这段代码只是一个起点。为了使其更通用,我认为我们应该解决以下问题:

  1. 接口:在中断情况下返回空消息不是一个好主意,最好使用异常来处理它
  2. 泛化 >:我们应该在socket.recv()之前调用一个函数,将中断扩展到其他recv方法变得非常简单
  3. 可移植性:简单地将其移植到 Windows 我们应该隔离异步通知 为

首先,我们将 test_interrupt_async() 更改为检查异常而不是空消息:

from interruptable_socket import InterruptException

def test_interrupt_async(self):
    receiver = self.create_receiver()
    start = time.time()
    with self.assertRaises(InterruptException):
        Timer(0.1, receiver.interrupt).start()
        receiver.recv(128)
    elapsed = time.time()-start
    self.assertGreaterEqual(elapsed, 0.095)
    self.assertLess(elapsed, 0.11)

此后,我们可以将 return '' 替换 引发 InterruptException 并且测试再次通过。

准备扩展的版本可以是:

interruptable_socket.py

import os
import socket
import select


class InterruptException(Exception):
    pass


class InterruptableUdpSocketReceiver(object):
    def __init__(self, host, port):
        self._host = host
        self._port = port
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._async_interrupt = AsycInterrupt(self._socket)

    def bind(self):
        self._socket.bind((self._host, self._port))

    def recv(self, buffersize, flags=0):
        self._async_interrupt.wait_for_receive()
        return self._socket.recv(buffersize, flags)

    def interrupt(self):
        self._async_interrupt.interrupt()


class AsycInterrupt(object):
    def __init__(self, descriptor):
        self._read, self._write = os.pipe()
        self._interrupted = False
        self._descriptor = descriptor

    def interrupt(self):
        self._interrupted = True
        self._notify()

    def wait_for_receive(self):
        if self._interrupted:
            raise RuntimeError("Cannot be reused")
        read, _w, errors = select.select([self._read, self._descriptor], [], [self._descriptor])
        if self._descriptor not in read:
            raise InterruptException

    def _notify(self):
        os.write(self._write, "I".encode())

现在包装了更多 recv 函数,实现 Windows 版本或处理套接字超时变得非常简单。

A good way to handle this kind of asynchronous interruption is the old C pipe trick. You can create a pipe and use select/poll on both socket and pipe: Now when you want interrupt receiver you can just send a char to the pipe.

  • pros:
    • Can work both for UDP and TCP
    • Is protocol agnostic
  • cons:
    • select/poll on pipes are not available on Windows, in this case you should replace it by another UDP socket that use as notification pipe

Starting point

interruptable_socket.py

import os
import socket
import select


class InterruptableUdpSocketReceiver(object):
    def __init__(self, host, port):
        self._host = host
        self._port = port
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._r_pipe, self._w_pipe = os.pipe()
        self._interrupted = False

    def bind(self):
        self._socket.bind((self._host, self._port))

    def recv(self, buffersize, flags=0):
        if self._interrupted:
            raise RuntimeError("Cannot be reused")
        read, _w, errors = select.select([self._r_pipe, self._socket], [], [self._socket])
        if self._socket in read:
            return self._socket.recv(buffersize, flags)
        return ""

    def interrupt(self):
        self._interrupted = True
        os.write(self._w_pipe, "I".encode())

A test suite:

test_interruptable_socket.py

import socket
from threading import Timer
import time
from interruptable_socket import InterruptableUdpSocketReceiver
import unittest


class Sender(object):
    def __init__(self, destination_host, destination_port):
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        self._dest = (destination_host, destination_port)

    def send(self, message):
        self._socket.sendto(message, self._dest)

class Test(unittest.TestCase):
    def create_receiver(self, host="127.0.0.1", port=3010):
        receiver = InterruptableUdpSocketReceiver(host, port)
        receiver.bind()
        return receiver

    def create_sender(self, host="127.0.0.1", port=3010):
        return Sender(host, port)

    def create_sender_receiver(self, host="127.0.0.1", port=3010):
        return self.create_sender(host, port), self.create_receiver(host, port)

    def test_create(self):
        self.create_receiver()

    def test_recv_async(self):
        sender, receiver = self.create_sender_receiver()
        start = time.time()
        send_message = "TEST".encode('UTF-8')
        Timer(0.1, sender.send, (send_message, )).start()
        message = receiver.recv(128)
        elapsed = time.time()-start
        self.assertGreaterEqual(elapsed, 0.095)
        self.assertLess(elapsed, 0.11)
        self.assertEqual(message, send_message)

    def test_interrupt_async(self):
        receiver = self.create_receiver()
        start = time.time()
        Timer(0.1, receiver.interrupt).start()
        message = receiver.recv(128)
        elapsed = time.time()-start
        self.assertGreaterEqual(elapsed, 0.095)
        self.assertLess(elapsed, 0.11)
        self.assertEqual(0, len(message))

    def test_exception_after_interrupt(self):
        sender, receiver = self.create_sender_receiver()
        receiver.interrupt()
        with self.assertRaises(RuntimeError):
            receiver.recv(128)


if __name__ == '__main__':
    unittest.main()

Evolution

Now this code is just a starting point. To make it more generic I see we should fix follow issues:

  1. Interface: return empty message in interrupt case is not a good deal, is better to use an exception to handle it
  2. Generalization: we should have just a function to call before socket.recv(), extend interrupt to others recv methods become very simple
  3. Portability: to make simple port it to windows we should isolate the async notification in a object to choose the right implementation for our operating system

First of all we change test_interrupt_async() to check exception instead empty message:

from interruptable_socket import InterruptException

def test_interrupt_async(self):
    receiver = self.create_receiver()
    start = time.time()
    with self.assertRaises(InterruptException):
        Timer(0.1, receiver.interrupt).start()
        receiver.recv(128)
    elapsed = time.time()-start
    self.assertGreaterEqual(elapsed, 0.095)
    self.assertLess(elapsed, 0.11)

After this we can replace return '' by raise InterruptException and the tests pass again.

The ready to extend version can be :

interruptable_socket.py

import os
import socket
import select


class InterruptException(Exception):
    pass


class InterruptableUdpSocketReceiver(object):
    def __init__(self, host, port):
        self._host = host
        self._port = port
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._async_interrupt = AsycInterrupt(self._socket)

    def bind(self):
        self._socket.bind((self._host, self._port))

    def recv(self, buffersize, flags=0):
        self._async_interrupt.wait_for_receive()
        return self._socket.recv(buffersize, flags)

    def interrupt(self):
        self._async_interrupt.interrupt()


class AsycInterrupt(object):
    def __init__(self, descriptor):
        self._read, self._write = os.pipe()
        self._interrupted = False
        self._descriptor = descriptor

    def interrupt(self):
        self._interrupted = True
        self._notify()

    def wait_for_receive(self):
        if self._interrupted:
            raise RuntimeError("Cannot be reused")
        read, _w, errors = select.select([self._read, self._descriptor], [], [self._descriptor])
        if self._descriptor not in read:
            raise InterruptException

    def _notify(self):
        os.write(self._write, "I".encode())

Now wraps more recv function, implement a windows version or take care of socket timeouts become really simple.

惜醉颜 2024-12-11 18:03:45

这里的解决办法是强行关闭socket。问题在于,执行此操作的方法是特定于操作系统的,并且 Python 不能很好地抽象执行此操作的方法或后果。基本上,您需要在套接字上执行 shutdown() 操作,然后执行 close() 操作。在 POSIX 系统(例如 Linux)上,关闭是强制 recvfrom 停止的关键元素(仅调用 close() 是无法做到这一点的)。在Windows上,shutdown()不会影响recvfrom,close()是关键元素。如果您在 C 中实现此代码并使用本机 POSIX 套接字或 Winsock 套接字,这正是您会看到的行为,因此 Python 在这些调用之上提供了一个非常薄的层。

在 POSIX 和 Windows 系统上,此调用序列会导致引发 OSError。但是,异常的位置及其详细信息是特定于操作系统的。在 POSIX 系统上,调用 shutdown() 时会引发异常,并且异常的 errno 值设置为 107(传输端点未连接)。在 Windows 系统上,调用 recvfrom() 时会引发异常,并且异常的 winerror 值设置为 10038(尝试对非套接字的操作)。这意味着无法以与操作系统无关的方式执行此操作,代码必须考虑 Windows 和 POSIX 行为和错误。这是我写的一个简单的例子:

import socket
import threading
import time

class MyServer(object):
    def __init__(self, port:int=0):
        if port == 0:
            raise AttributeError('Invalid port supplied.')

        self.port = port
        self.socket = socket.socket(family=socket.AF_INET,
                type=socket.SOCK_DGRAM)
        self.socket.bind(('0.0.0.0', port))

        self.exit_now = False

        print('Starting server.')
        self.thread = threading.Thread(target=self.run_server,
                args=[self.socket])
        self.thread.start()

    def run_server(self, socket:socket.socket=None):
        if socket is None:
            raise AttributeError('No socket provided.')

        buffer_size = 4096

        while self.exit_now == False:
            data = b''
            try:
                data, address = socket.recvfrom(buffer_size)
            except OSError as e:
                if e.winerror == 10038:
                    # Error is, "An operation was attempted on something that
                    # is not a socket".  We don't care.
                    pass
                else:
                    raise e
            if len(data) > 0:
                print(f'Received {len(data)} bytes from {address}.')

    def stop(self):
        self.exit_now = True
        try:
            self.socket.shutdown(socket.SHUT_RDWR)
        except OSError as e:
            if e.errno == 107:
                # Error is, "Transport endpoint is not connected".
                # We don't care.
                pass
            else:
                raise e
        self.socket.close()
        self.thread.join()
        print('Server stopped.')


if __name__ == '__main__':
    server = MyServer(5555)
    time.sleep(2)
    server.stop()
    exit(0)

The solution here is to forcibly close the socket. The problem is that the method for doing this is OS-specific and Python does not do a good job of abstracting the way to do it or the consequences. Basically, you need to do a shutdown() followed by a close() on the socket. On POSIX systems such as Linux, the shutdown is the key element in forcing recvfrom to stop (a call to close() alone won't do it). On Windows, shutdown() does not affect the recvfrom and the close() is the key element. This is exactly the behavior that you would see if you were implementing this code in C and using either native POSIX sockets or Winsock sockets, so Python is providing a very thin layer on top of those calls.

On both POSIX and Windows systems, this sequence of calls results in an OSError being raised. However, the location of the exception and the details of it are OS-specific. On POSIX systems, the exception is raised on the call to shutdown() and the errno value of the exception is set to 107 (Transport endpoint is not connected). On Windows systems, the exception is raised on the call to recvfrom() and the winerror value of the exception is set to 10038 (An operation was attempted on something that is not a socket). This means that there's no way to do this in an OS-agnositc way, the code has to account for both Windows and POSIX behavior and errors. Here's a simple example I wrote up:

import socket
import threading
import time

class MyServer(object):
    def __init__(self, port:int=0):
        if port == 0:
            raise AttributeError('Invalid port supplied.')

        self.port = port
        self.socket = socket.socket(family=socket.AF_INET,
                type=socket.SOCK_DGRAM)
        self.socket.bind(('0.0.0.0', port))

        self.exit_now = False

        print('Starting server.')
        self.thread = threading.Thread(target=self.run_server,
                args=[self.socket])
        self.thread.start()

    def run_server(self, socket:socket.socket=None):
        if socket is None:
            raise AttributeError('No socket provided.')

        buffer_size = 4096

        while self.exit_now == False:
            data = b''
            try:
                data, address = socket.recvfrom(buffer_size)
            except OSError as e:
                if e.winerror == 10038:
                    # Error is, "An operation was attempted on something that
                    # is not a socket".  We don't care.
                    pass
                else:
                    raise e
            if len(data) > 0:
                print(f'Received {len(data)} bytes from {address}.')

    def stop(self):
        self.exit_now = True
        try:
            self.socket.shutdown(socket.SHUT_RDWR)
        except OSError as e:
            if e.errno == 107:
                # Error is, "Transport endpoint is not connected".
                # We don't care.
                pass
            else:
                raise e
        self.socket.close()
        self.thread.join()
        print('Server stopped.')


if __name__ == '__main__':
    server = MyServer(5555)
    time.sleep(2)
    server.stop()
    exit(0)
关于从前 2024-12-11 18:03:45

在服务器和客户端套接字上执行退出命令。应该像这样工作:

Thread1: 
    status: listening
    handler: quit

Thread2: client
    exec: socket.send "quit"  ---> Thread1.socket @ host:port

Thread1: 
    status: socket closed()

Implement a quit command on the server and client sockets. Should work something like this:

Thread1: 
    status: listening
    handler: quit

Thread2: client
    exec: socket.send "quit"  ---> Thread1.socket @ host:port

Thread1: 
    status: socket closed()
久光 2024-12-11 18:03:45

要在Python中正确关闭tcp套接字,您必须在调用socket.close()之前调用socket.shutdown(arg)。请参阅python套接字文档,有关关闭的部分。

如果套接字是UDP,则不能调用socket.shutdown(...),它会引发异常。单独调用 socket.close() 会像 TCP 一样,使被阻止的操作保持阻塞状态。 close() 单独不会中断它们。

许多建议的解决方案(并非全部)不起作用或被认为很麻烦,因为它们涉及第三方库。我还没有测试 poll() 或 select()。肯定有效的方法如下:

首先,为运行 socket.recv() 的任何线程创建一个正式的 Thread 对象,并将句柄保存到它。其次,导入信号。 Signal 是一个官方库,它允许向进程发送/接收 linux/posix 信号(阅读其文档)。第三,要中断,假设线程的句柄称为 udpThreadHandle:

signal.pthread_kill(udpthreadHandle.ident, signal.SIGINT)

当然,在执行接收的实际线程/循环中:

try:
    while True:
       myUdpSocket.recv(...)
except KeyboardInterrupt:
    pass

注意,KeyboardInterrupt 的异常处理程序(由 SIGINT 生成)位于接收循环之外。这会默默地终止接收循环及其线程。

To properly close a tcp socket in python, you have to call socket.shutdown(arg) before calling socket.close(). See the python socket documentation, the part about shutdown.

If the socket is UDP, you can't call socket.shutdown(...), it would raise an exception. And calling socket.close() alone would, like for tcp, keep the blocked operations blocking. close() alone won't interrupt them.

Many suggested solutions (not all), don't work or are seen as cumbersome as they involve 3rd party libraries. I haven't tested poll() or select(). What does definately work, is the following:

firstly, create an official Thread object for whatever thread is running socket.recv(), and save the handle to it. Secondly, import signal. Signal is an official library, which enables sending/recieving linux/posix signals to processes (read its documentation). Thirdly, to interrupt, assuming that handle to your thread is called udpThreadHandle:

signal.pthread_kill(udpthreadHandle.ident, signal.SIGINT)

and ofcourse, in the actual thread/loop doing the recieving:

try:
    while True:
       myUdpSocket.recv(...)
except KeyboardInterrupt:
    pass

Notice, the exception handler for KeyboardInterrupt (generated by SIGINT), is OUTSIDE the recieve loop. This silently terminates the recieve loop and its thread.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文