有什么方法可以杀死线程吗?

发布于 2025-01-24 20:22:45 字数 39 浏览 2 评论 0 原文

是否可以在不设置/检查任何标志/信号量/等的情况下终止运行线程?

Is it possible to terminate a running thread without setting/checking any flags/semaphores/etc.?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(30

韬韬不绝 2025-01-31 20:22:46

通常,用python和任何语言杀死线程是一个不好的模式。考虑以下情况:

  • 该线程持有一个重要的资源,必须正确关闭
  • 该线程,线程也创建了其他几个必须杀死的线程。

如果您负担得起的话,处理此问题的好方法(如果您要管理自己的线程),则是具有一个exit_request标志,每个线程都会在常规间隔上检查,以查看是否是时候退出它了。

例如:

import threading

class StoppableThread(threading.Thread):
    """Thread class with a stop() method. The thread itself has to check
    regularly for the stopped() condition."""

    def __init__(self,  *args, **kwargs):
        super(StoppableThread, self).__init__(*args, **kwargs)
        self._stop_event = threading.Event()

    def stop(self):
        self._stop_event.set()

    def stopped(self):
        return self._stop_event.is_set()

在此代码中,您应该在线程退出时在线程上调用 stop(),并等待线程使用 JOIN正确退出()。线程应定期检查停止标志。

但是,在某些情况下,您确实需要杀死线程。一个例子是,当您包裹一个忙于长时间通话的外部库时,您想中断它。

以下代码允许(有些限制)在python线程中提高例外:(

def _async_raise(tid, exctype):
    '''Raises an exception in the threads with id tid'''
    if not inspect.isclass(exctype):
        raise TypeError("Only types can be raised (not instances)")
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
                                                     ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        # "if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"
        ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

class ThreadWithExc(threading.Thread):
    '''A thread class that supports raising an exception in the thread from
       another thread.
    '''
    def _get_my_tid(self):
        """determines this (self's) thread id

        CAREFUL: this function is executed in the context of the caller
        thread, to get the identity of the thread represented by this
        instance.
        """
        if not self.is_alive(): # Note: self.isAlive() on older version of Python
            raise threading.ThreadError("the thread is not active")

        # do we have it cached?
        if hasattr(self, "_thread_id"):
            return self._thread_id

        # no, look for it in the _active dict
        for tid, tobj in threading._active.items():
            if tobj is self:
                self._thread_id = tid
                return tid

        # TODO: in python 2.6, there's a simpler way to do: self.ident

        raise AssertionError("could not determine the thread's id")

    def raise_exc(self, exctype):
        """Raises the given exception type in the context of this thread.

        If the thread is busy in a system call (time.sleep(),
        socket.accept(), ...), the exception is simply ignored.

        If you are sure that your exception should terminate the thread,
        one way to ensure that it works is:

            t = ThreadWithExc( ... )
            ...
            t.raise_exc( SomeException )
            while t.isAlive():
                time.sleep( 0.1 )
                t.raise_exc( SomeException )

        If the exception is to be caught by the thread, you need a way to
        check that your thread has caught it.

        CAREFUL: this function is executed in the context of the
        caller thread, to raise an exception in the context of the
        thread represented by this instance.
        """
        _async_raise( self._get_my_tid(), exctype )

基于 tomer filiba。 的旧版本。

dev/python/pystate.c“ rel =“ noreferrer”> python 抓住中断。

此代码的良好用法模式是让线程捕获特定的例外并执行清理。这样,您可以中断一项任务并仍然进行适当的清理。

It is generally a bad pattern to kill a thread abruptly, in Python, and in any language. Think of the following cases:

  • the thread is holding a critical resource that must be closed properly
  • the thread has created several other threads that must be killed as well.

The nice way of handling this, if you can afford it (if you are managing your own threads), is to have an exit_request flag that each thread checks on a regular interval to see if it is time for it to exit.

For example:

import threading

class StoppableThread(threading.Thread):
    """Thread class with a stop() method. The thread itself has to check
    regularly for the stopped() condition."""

    def __init__(self,  *args, **kwargs):
        super(StoppableThread, self).__init__(*args, **kwargs)
        self._stop_event = threading.Event()

    def stop(self):
        self._stop_event.set()

    def stopped(self):
        return self._stop_event.is_set()

In this code, you should call stop() on the thread when you want it to exit, and wait for the thread to exit properly using join(). The thread should check the stop flag at regular intervals.

There are cases, however, when you really need to kill a thread. An example is when you are wrapping an external library that is busy for long calls, and you want to interrupt it.

The following code allows (with some restrictions) to raise an Exception in a Python thread:

def _async_raise(tid, exctype):
    '''Raises an exception in the threads with id tid'''
    if not inspect.isclass(exctype):
        raise TypeError("Only types can be raised (not instances)")
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
                                                     ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        # "if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"
        ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

class ThreadWithExc(threading.Thread):
    '''A thread class that supports raising an exception in the thread from
       another thread.
    '''
    def _get_my_tid(self):
        """determines this (self's) thread id

        CAREFUL: this function is executed in the context of the caller
        thread, to get the identity of the thread represented by this
        instance.
        """
        if not self.is_alive(): # Note: self.isAlive() on older version of Python
            raise threading.ThreadError("the thread is not active")

        # do we have it cached?
        if hasattr(self, "_thread_id"):
            return self._thread_id

        # no, look for it in the _active dict
        for tid, tobj in threading._active.items():
            if tobj is self:
                self._thread_id = tid
                return tid

        # TODO: in python 2.6, there's a simpler way to do: self.ident

        raise AssertionError("could not determine the thread's id")

    def raise_exc(self, exctype):
        """Raises the given exception type in the context of this thread.

        If the thread is busy in a system call (time.sleep(),
        socket.accept(), ...), the exception is simply ignored.

        If you are sure that your exception should terminate the thread,
        one way to ensure that it works is:

            t = ThreadWithExc( ... )
            ...
            t.raise_exc( SomeException )
            while t.isAlive():
                time.sleep( 0.1 )
                t.raise_exc( SomeException )

        If the exception is to be caught by the thread, you need a way to
        check that your thread has caught it.

        CAREFUL: this function is executed in the context of the
        caller thread, to raise an exception in the context of the
        thread represented by this instance.
        """
        _async_raise( self._get_my_tid(), exctype )

(Based on Killable Threads by Tomer Filiba. The quote about the return value of PyThreadState_SetAsyncExc appears to be from an old version of Python.)

As noted in the documentation, this is not a magic bullet because if the thread is busy outside the Python interpreter, it will not catch the interruption.

A good usage pattern of this code is to have the thread catch a specific exception and perform the cleanup. That way, you can interrupt a task and still have proper cleanup.

烟沫凡尘 2025-01-31 20:22:46

a p. terminate(

)我想杀死线程,但不想使用标志/锁/信号/信号量/事件/其他,我将线程宣传到完整的过程。对于仅使用几个线程的代码,开销还不错。

例如,这很方便以轻松终止执行阻止I/O转换的“线程”

是微不足道的:在相关代码中替换所有 threading.thread.thread supperprocessing.processing.process.process.process and code>和所有 queue.queue.queue with 多处理。Queue,并将 p.terminate()的所需调用添加到您的父程进程中 p

请参阅 python文档,用于 python a>。

例子:

import multiprocessing
proc = multiprocessing.Process(target=your_proc_function, args=())
proc.start()
# Terminate the process
proc.terminate()  # sends a SIGTERM

A multiprocessing.Process can p.terminate()

In the cases where I want to kill a thread, but do not want to use flags/locks/signals/semaphores/events/whatever, I promote the threads to full blown processes. For code that makes use of just a few threads the overhead is not that bad.

E.g. this comes in handy to easily terminate helper "threads" which execute blocking I/O

The conversion is trivial: In related code replace all threading.Thread with multiprocessing.Process and all queue.Queue with multiprocessing.Queue and add the required calls of p.terminate() to your parent process which wants to kill its child p

See the Python documentation for multiprocessing.

Example:

import multiprocessing
proc = multiprocessing.Process(target=your_proc_function, args=())
proc.start()
# Terminate the process
proc.terminate()  # sends a SIGTERM
ゞ记忆︶ㄣ 2025-01-31 20:22:46

没有官方的API要这样做,不。

您需要使用平台API杀死线程,例如pthread_kill或terminateThread。您可以通过pythonwin或通过CTYPE访问此类API。

请注意,这本质上是不安全的。这可能会导致无法收集的垃圾(从堆栈框架的当地变量变成垃圾),并可能导致死锁,如果被杀死的线程在被杀死时具有GIL。

There is no official API to do that, no.

You need to use platform API to kill the thread, e.g. pthread_kill, or TerminateThread. You can access such API e.g. through pythonwin, or through ctypes.

Notice that this is inherently unsafe. It will likely lead to uncollectable garbage (from local variables of the stack frames that become garbage), and may lead to deadlocks, if the thread being killed has the GIL at the point when it is killed.

最佳男配角 2025-01-31 20:22:46

如果要终止整个程序,则可以将线程设置为“守护程序”。参见 thread.daemon.daemon

If you are trying to terminate the whole program you can set the thread as a "daemon". see Thread.daemon

要走干脆点 2025-01-31 20:22:46

正如其他人提到的那样,规范是设置停止标志。对于轻巧的东西(没有线程的子分类,没有全局变量),可以选择lambda回调。 (请注意中的括号,如果stop()。)

import threading
import time

def do_work(id, stop):
    print("I am thread", id)
    while True:
        print("I am thread {} doing something".format(id))
        if stop():
            print("  Exiting loop.")
            break
    print("Thread {}, signing off".format(id))


def main():
    stop_threads = False
    workers = []
    for id in range(0,3):
        tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads))
        workers.append(tmp)
        tmp.start()
    time.sleep(3)
    print('main: done sleeping; time to stop the threads.')
    stop_threads = True
    for worker in workers:
        worker.join()
    print('Finis.')

if __name__ == '__main__':
    main()

print()替换为 pr()始终flushes( sys) .stdout.flush())可以提高外壳输出的精度。

(仅在Windows/Eclipse/Python3.3上测试)

As others have mentioned, the norm is to set a stop flag. For something lightweight (no subclassing of Thread, no global variable), a lambda callback is an option. (Note the parentheses in if stop().)

import threading
import time

def do_work(id, stop):
    print("I am thread", id)
    while True:
        print("I am thread {} doing something".format(id))
        if stop():
            print("  Exiting loop.")
            break
    print("Thread {}, signing off".format(id))


def main():
    stop_threads = False
    workers = []
    for id in range(0,3):
        tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads))
        workers.append(tmp)
        tmp.start()
    time.sleep(3)
    print('main: done sleeping; time to stop the threads.')
    stop_threads = True
    for worker in workers:
        worker.join()
    print('Finis.')

if __name__ == '__main__':
    main()

Replacing print() with a pr() function that always flushes (sys.stdout.flush()) may improve the precision of the shell output.

(Only tested on Windows/Eclipse/Python3.3)

荒岛晴空 2025-01-31 20:22:46

在Python中,您根本无法直接杀死线程。

如果您真的不需要一个线程(!),您可以做什么,而不是使用 螺纹 package ,是使用
多处理 package 。在这里,要杀死一个过程,您可以简单地调用该方法:

your_process.terminate()  # kill the process!

Python将杀死您的过程(通过Sigterm Signs在UNIX上,而在Windows上通过 terminateProcess()呼叫)。使用队列或管道时请注意使用它! (它可能会损坏队列/管道中的数据)

请注意,多处理多处理。-eMaphore 以与螺纹的方式完全相同。事件螺纹。分别。实际上,第一个是后者的克隆。

如果您确实需要使用线程,则无法直接杀死它。但是,您可以做的是使用“守护程序线程” 。实际上,在Python中,可以将线程标记为 daemon

your_thread.daemon = True  # set the Thread as a "daemon thread"

当没有活着的非daemon线程时,主程序将退出。换句话说,当您的主线程(当然是非daemon线程)将完成其操作时,即使仍然有一些守护程序线程可行,该程序也会退出。

请注意,在调用 start()方法之前,必须将线程设置为 daemon

当然,即使使用多处理,您也可以并且应该使用 daemon 。在这里,当主要过程退出时,它试图终止其所有守护程序过程。

最后,请注意, sys.exit() os.kill()不是选择。

In Python, you simply cannot kill a Thread directly.

If you do NOT really need to have a Thread (!), what you can do, instead of using the threading package , is to use the
multiprocessing package . Here, to kill a process, you can simply call the method:

your_process.terminate()  # kill the process!

Python will kill your process (on Unix through the SIGTERM signal, while on Windows through the TerminateProcess() call). Pay attention to use it while using a Queue or a Pipe! (it may corrupt the data in the Queue/Pipe)

Note that the multiprocessing.Event and the multiprocessing.Semaphore work exactly in the same way of the threading.Event and the threading.Semaphore respectively. In fact, the first ones are clones of the latters.

If you REALLY need to use a Thread, there is no way to kill it directly. What you can do, however, is to use a "daemon thread". In fact, in Python, a Thread can be flagged as daemon:

your_thread.daemon = True  # set the Thread as a "daemon thread"

The main program will exit when no alive non-daemon threads are left. In other words, when your main thread (which is, of course, a non-daemon thread) will finish its operations, the program will exit even if there are still some daemon threads working.

Note that it is necessary to set a Thread as daemon before the start() method is called!

Of course you can, and should, use daemon even with multiprocessing. Here, when the main process exits, it attempts to terminate all of its daemonic child processes.

Finally, please, note that sys.exit() and os.kill() are not choices.

影子是时光的心 2025-01-31 20:22:46

这基于 thread2-可杀死线程激活配方。

您需要调用 pythreadState_setaState_setAseNcexcc() /a>,仅通过 ctypes 模块。

这仅在Python 2.7.3上进行了测试,但很可能与其他最近的2.x版本一起使用。 pythreadstate_setasyncexc()仍然存在于Python 3中,以供向后兼容(但我尚未对其进行测试)。

import ctypes

def terminate_thread(thread):
    """Terminates a python thread from another thread.

    :param thread: a threading.Thread instance
    """
    if not thread.isAlive():
        return

    exc = ctypes.py_object(SystemExit)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        ctypes.c_long(thread.ident), exc)
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

This is based on the thread2 -- killable threads ActiveState recipe.

You need to call PyThreadState_SetAsyncExc(), which is only available through the ctypes module.

This has only been tested on Python 2.7.3, but it is likely to work with other recent 2.x releases. PyThreadState_SetAsyncExc() still exists in Python 3 for backwards compatibility (but I have not tested it).

import ctypes

def terminate_thread(thread):
    """Terminates a python thread from another thread.

    :param thread: a threading.Thread instance
    """
    if not thread.isAlive():
        return

    exc = ctypes.py_object(SystemExit)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        ctypes.c_long(thread.ident), exc)
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
        raise SystemError("PyThreadState_SetAsyncExc failed")
椵侞 2025-01-31 20:22:46

如果不与之合作,您绝对不应强行杀死线程。

可以删除任何保证尝试/最终阻止设置的保证

杀死线程 。

You should never forcibly kill a thread without cooperating with it.

Killing a thread removes any guarantees that try/finally blocks set up so you might leave locks locked, files open, etc.

The only time you can argue that forcibly killing threads is a good idea is to kill a program fast, but never single threads.

夏末 2025-01-31 20:22:46

如果您要明确调用 time.sleep()作为线程的一部分(例如,对某些外部服务进行轮询),则对Phillipe方法的改进是在 event> Event 中使用超时's wait()方法 sleep> sleep()

例如:

import threading

class KillableThread(threading.Thread):
    def __init__(self, sleep_interval=1):
        super().__init__()
        self._kill = threading.Event()
        self._interval = sleep_interval

    def run(self):
        while True:
            print("Do Something")

            # If no kill signal is set, sleep for the interval,
            # If kill signal comes in while sleeping, immediately
            #  wake up and handle
            is_killed = self._kill.wait(self._interval)
            if is_killed:
                break

        print("Killing Thread")

    def kill(self):
        self._kill.set()

然后运行

t = KillableThread(sleep_interval=5)
t.start()
# Every 5 seconds it prints:
#: Do Something
t.kill()
#: Killing Thread

wait() > sleep()并定期检查事件是您可以以较长的睡眠时间间隔进行编程,该线程几乎立即停止(否则您将是 sleep() in)和我认为,处理出口的代码非常简单。

If you are explicitly calling time.sleep() as part of your thread (say polling some external service), an improvement upon Phillipe's method is to use the timeout in the event's wait() method wherever you sleep()

For example:

import threading

class KillableThread(threading.Thread):
    def __init__(self, sleep_interval=1):
        super().__init__()
        self._kill = threading.Event()
        self._interval = sleep_interval

    def run(self):
        while True:
            print("Do Something")

            # If no kill signal is set, sleep for the interval,
            # If kill signal comes in while sleeping, immediately
            #  wake up and handle
            is_killed = self._kill.wait(self._interval)
            if is_killed:
                break

        print("Killing Thread")

    def kill(self):
        self._kill.set()

Then to run it

t = KillableThread(sleep_interval=5)
t.start()
# Every 5 seconds it prints:
#: Do Something
t.kill()
#: Killing Thread

The advantage of using wait() instead of sleep()ing and regularly checking the event is that you can program in longer intervals of sleep, the thread is stopped almost immediately (when you would otherwise be sleep()ing) and in my opinion, the code for handling exit is significantly simpler.

南冥有猫 2025-01-31 20:22:46

您可以通过将跟踪安装到线程中,以杀死线程,以退出线程。有关一个可能的实现,请参见附件链接。

You can kill a thread by installing trace into the thread that will exit the thread. See attached link for one possible implementation.

Kill a thread in Python

冷血 2025-01-31 20:22:46

绝对可以实现 thread.stop 方法,如以下示例代码:

import sys
import threading
import time


class StopThread(StopIteration):
    pass

threading.SystemExit = SystemExit, StopThread


class Thread2(threading.Thread):

    def stop(self):
        self.__stop = True

    def _bootstrap(self):
        if threading._trace_hook is not None:
            raise ValueError('Cannot run thread with tracing!')
        self.__stop = False
        sys.settrace(self.__trace)
        super()._bootstrap()

    def __trace(self, frame, event, arg):
        if self.__stop:
            raise StopThread()
        return self.__trace


class Thread3(threading.Thread):

    def _bootstrap(self, stop_thread=False):
        def stop():
            nonlocal stop_thread
            stop_thread = True
        self.stop = stop

        def tracer(*_):
            if stop_thread:
                raise StopThread()
            return tracer
        sys.settrace(tracer)
        super()._bootstrap()

###############################################################################


def main():
    test1 = Thread2(target=printer)
    test1.start()
    time.sleep(1)
    test1.stop()
    test1.join()
    test2 = Thread2(target=speed_test)
    test2.start()
    time.sleep(1)
    test2.stop()
    test2.join()
    test3 = Thread3(target=speed_test)
    test3.start()
    time.sleep(1)
    test3.stop()
    test3.join()


def printer():
    while True:
        print(time.time() % 1)
        time.sleep(0.1)


def speed_test(count=0):
    try:
        while True:
            count += 1
    except StopThread:
        print('Count =', count)

if __name__ == '__main__':
    main()

thread3 class似乎比 thread2快于33%的代码运行代码2 类。


附录:

有足够的了解Python的C API和使用 ctypes 模块,可以在需要时编写一种更有效的方法来停止线程。使用 sys.settrace 的问题在于,跟踪功能在每次指令后运行。如果在需要中止的线程上提出异步异常,则不会产生执行速度惩罚。以下代码在这方面提供了一些灵活性:

#! /usr/bin/env python3
import _thread
import ctypes as _ctypes
import threading as _threading

_PyThreadState_SetAsyncExc = _ctypes.pythonapi.PyThreadState_SetAsyncExc
# noinspection SpellCheckingInspection
_PyThreadState_SetAsyncExc.argtypes = _ctypes.c_ulong, _ctypes.py_object
_PyThreadState_SetAsyncExc.restype = _ctypes.c_int

# noinspection PyUnreachableCode
if __debug__:
    # noinspection PyShadowingBuiltins
    def _set_async_exc(id, exc):
        if not isinstance(id, int):
            raise TypeError(f'{id!r} not an int instance')
        if not isinstance(exc, type):
            raise TypeError(f'{exc!r} not a type instance')
        if not issubclass(exc, BaseException):
            raise SystemError(f'{exc!r} not a BaseException subclass')
        return _PyThreadState_SetAsyncExc(id, exc)
else:
    _set_async_exc = _PyThreadState_SetAsyncExc


# noinspection PyShadowingBuiltins
def set_async_exc(id, exc, *args):
    if args:
        class StateInfo(exc):
            def __init__(self):
                super().__init__(*args)

        return _set_async_exc(id, StateInfo)
    return _set_async_exc(id, exc)


def interrupt(ident=None):
    if ident is None:
        _thread.interrupt_main()
    else:
        set_async_exc(ident, KeyboardInterrupt)


# noinspection PyShadowingBuiltins
def exit(ident=None):
    if ident is None:
        _thread.exit()
    else:
        set_async_exc(ident, SystemExit)


class ThreadAbortException(SystemExit):
    pass


class Thread(_threading.Thread):
    def set_async_exc(self, exc, *args):
        return set_async_exc(self.ident, exc, *args)

    def interrupt(self):
        self.set_async_exc(KeyboardInterrupt)

    def exit(self):
        self.set_async_exc(SystemExit)

    def abort(self, *args):
        self.set_async_exc(ThreadAbortException, *args)

It is definitely possible to implement a Thread.stop method as shown in the following example code:

import sys
import threading
import time


class StopThread(StopIteration):
    pass

threading.SystemExit = SystemExit, StopThread


class Thread2(threading.Thread):

    def stop(self):
        self.__stop = True

    def _bootstrap(self):
        if threading._trace_hook is not None:
            raise ValueError('Cannot run thread with tracing!')
        self.__stop = False
        sys.settrace(self.__trace)
        super()._bootstrap()

    def __trace(self, frame, event, arg):
        if self.__stop:
            raise StopThread()
        return self.__trace


class Thread3(threading.Thread):

    def _bootstrap(self, stop_thread=False):
        def stop():
            nonlocal stop_thread
            stop_thread = True
        self.stop = stop

        def tracer(*_):
            if stop_thread:
                raise StopThread()
            return tracer
        sys.settrace(tracer)
        super()._bootstrap()

###############################################################################


def main():
    test1 = Thread2(target=printer)
    test1.start()
    time.sleep(1)
    test1.stop()
    test1.join()
    test2 = Thread2(target=speed_test)
    test2.start()
    time.sleep(1)
    test2.stop()
    test2.join()
    test3 = Thread3(target=speed_test)
    test3.start()
    time.sleep(1)
    test3.stop()
    test3.join()


def printer():
    while True:
        print(time.time() % 1)
        time.sleep(0.1)


def speed_test(count=0):
    try:
        while True:
            count += 1
    except StopThread:
        print('Count =', count)

if __name__ == '__main__':
    main()

The Thread3 class appears to run code approximately 33% faster than the Thread2 class.


Addendum:

With sufficient knowledge of Python's C API and the use of the ctypes module, it is possible to write a far more efficient way of stopping a thread when desired. The problem with using sys.settrace is that the tracing function runs after each instruction. If an asynchronous exception is raised instead on the thread that needs to be aborted, no execution speed penalty is incurred. The following code provides some flexibility in this regard:

#! /usr/bin/env python3
import _thread
import ctypes as _ctypes
import threading as _threading

_PyThreadState_SetAsyncExc = _ctypes.pythonapi.PyThreadState_SetAsyncExc
# noinspection SpellCheckingInspection
_PyThreadState_SetAsyncExc.argtypes = _ctypes.c_ulong, _ctypes.py_object
_PyThreadState_SetAsyncExc.restype = _ctypes.c_int

# noinspection PyUnreachableCode
if __debug__:
    # noinspection PyShadowingBuiltins
    def _set_async_exc(id, exc):
        if not isinstance(id, int):
            raise TypeError(f'{id!r} not an int instance')
        if not isinstance(exc, type):
            raise TypeError(f'{exc!r} not a type instance')
        if not issubclass(exc, BaseException):
            raise SystemError(f'{exc!r} not a BaseException subclass')
        return _PyThreadState_SetAsyncExc(id, exc)
else:
    _set_async_exc = _PyThreadState_SetAsyncExc


# noinspection PyShadowingBuiltins
def set_async_exc(id, exc, *args):
    if args:
        class StateInfo(exc):
            def __init__(self):
                super().__init__(*args)

        return _set_async_exc(id, StateInfo)
    return _set_async_exc(id, exc)


def interrupt(ident=None):
    if ident is None:
        _thread.interrupt_main()
    else:
        set_async_exc(ident, KeyboardInterrupt)


# noinspection PyShadowingBuiltins
def exit(ident=None):
    if ident is None:
        _thread.exit()
    else:
        set_async_exc(ident, SystemExit)


class ThreadAbortException(SystemExit):
    pass


class Thread(_threading.Thread):
    def set_async_exc(self, exc, *args):
        return set_async_exc(self.ident, exc, *args)

    def interrupt(self):
        self.set_async_exc(KeyboardInterrupt)

    def exit(self):
        self.set_async_exc(SystemExit)

    def abort(self, *args):
        self.set_async_exc(ThreadAbortException, *args)
妖妓 2025-01-31 20:22:46

如果您不杀死线程,那就更好了。
一种方法可能是将“尝试”块引入线程周期中,并在要停止线程时引发异常(例如,断路/返回/...阻止您的/while/while/...)。
我已经在我的应用程序上使用了此功能,它有效...

It is better if you don't kill a thread.
A way could be to introduce a "try" block into the thread's cycle and to throw an exception when you want to stop the thread (for example a break/return/... that stops your for/while/...).
I've used this on my app and it works...

雨落星ぅ辰 2025-01-31 20:22:46

我到这个游戏迟到了,但是我一直在用一个类似的问题,下面似乎都解决了对于我来说,这个问题非常完美,让我可以在守护程序的子线退出时进行一些基本的线程检查和清理:

import threading
import time
import atexit

def do_work():

  i = 0
  @atexit.register
  def goodbye():
    print ("'CLEANLY' kill sub-thread with value: %s [THREAD: %s]" %
           (i, threading.currentThread().ident))

  while True:
    print i
    i += 1
    time.sleep(1)

t = threading.Thread(target=do_work)
t.daemon = True
t.start()

def after_timeout():
  print "KILL MAIN THREAD: %s" % threading.currentThread().ident
  raise SystemExit

threading.Timer(2, after_timeout).start()

产量:

0
1
KILL MAIN THREAD: 140013208254208
'CLEANLY' kill sub-thread with value: 2 [THREAD: 140013674317568]

I'm way late to this game, but I've been wrestling with a similar question and the following appears to both resolve the issue perfectly for me AND lets me do some basic thread state checking and cleanup when the daemonized sub-thread exits:

import threading
import time
import atexit

def do_work():

  i = 0
  @atexit.register
  def goodbye():
    print ("'CLEANLY' kill sub-thread with value: %s [THREAD: %s]" %
           (i, threading.currentThread().ident))

  while True:
    print i
    i += 1
    time.sleep(1)

t = threading.Thread(target=do_work)
t.daemon = True
t.start()

def after_timeout():
  print "KILL MAIN THREAD: %s" % threading.currentThread().ident
  raise SystemExit

threading.Timer(2, after_timeout).start()

Yields:

0
1
KILL MAIN THREAD: 140013208254208
'CLEANLY' kill sub-thread with value: 2 [THREAD: 140013674317568]
花开半夏魅人心 2025-01-31 20:22:46

可以使用以下方法来杀死线程:

kill_threads = False

def doSomething():
    global kill_threads
    while True:
        if kill_threads:
            thread.exit()
        ......
        ......

thread.start_new_thread(doSomething, ())

即使是终止线程中的代码写在另一个模块中的螺纹的线程也可以使用。我们可以在该模块中声明一个全局变量,并使用它终止该模块中产生的线程。

我通常会使用它来终止程序退出的所有线程。这可能不是终止线程的理想方法,但可能会有所帮助。

Following workaround can be used to kill a thread:

kill_threads = False

def doSomething():
    global kill_threads
    while True:
        if kill_threads:
            thread.exit()
        ......
        ......

thread.start_new_thread(doSomething, ())

This can be used even for terminating threads, whose code is written in another module, from main thread. We can declare a global variable in that module and use it to terminate thread/s spawned in that module.

I usually use this to terminate all the threads at the program exit. This might not be the perfect way to terminate thread/s but could help.

岁月静好 2025-01-31 20:22:46

这是另一种方法,但是使用极其干净和简单的代码,它在2021年在Python 3.7中起作用:

import ctypes 

def kill_thread(thread):
    """
    thread: a threading.Thread object
    """
    thread_id = thread.ident
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit))
    if res > 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
        print('Exception raise failure')

从这里进行了改编: https://www.geeksforgeeks.org/python-different-ways-ways-to-kill-a-a-thread/

Here's yet another way to do it, but with extremely clean and simple code, that works in Python 3.7 in 2021:

import ctypes 

def kill_thread(thread):
    """
    thread: a threading.Thread object
    """
    thread_id = thread.ident
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit))
    if res > 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
        print('Exception raise failure')

Adapted from here: https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/

梦里兽 2025-01-31 20:22:46
from ctypes import *
pthread = cdll.LoadLibrary("libpthread-2.15.so")
pthread.pthread_cancel(c_ulong(t.ident))

t 是您的线程对象。

阅读Python源(模块/threadmodule.c python/thread_pthread.h )您可以看到 thread.thread.ender > pthread_t 类型,因此您可以执行任何操作 pthread 可以在Python中使用 libpthread

from ctypes import *
pthread = cdll.LoadLibrary("libpthread-2.15.so")
pthread.pthread_cancel(c_ulong(t.ident))

t is your Thread object.

Read the python source (Modules/threadmodule.c and Python/thread_pthread.h) you can see the Thread.ident is an pthread_t type, so you can do anything pthread can do in python use libpthread.

熊抱啵儿 2025-01-31 20:22:46

我要补充的一件事是,如果您阅读,建议避免使用“恶魔”线程,当您不希望线程突然结束时,使用Paolo Rovelli a>。

从官方文件中:

守护程序线程在关闭时突然停止。他们的资源(例如打开文件,数据库交易等)可能无法正确发布。如果您希望线程优雅地停止,请使其非传说并使用合适的信号机制,例如事件。

我认为创建守护线的线程取决于您的应用程序,但总的来说(在我看来)最好避免杀死它们或使其守护。在多处理中,您可以使用 is_alive()检查进程状态并“终止”以完成它们(也避免GIL问题)。但是,当您在Windows中执行代码时,您会发现更多问题。

始终记住,如果您拥有“实时线程”,则Python解释器将持续到等待它们。 (因为这个守护者,如果不突然结束的话,可以为您提供帮助)。

One thing I want to add is that if you read official documentation in threading lib Python, it's recommended to avoid use of "demonic" threads, when you don't want threads end abruptly, with the flag that Paolo Rovelli mentioned.

From official documentation:

Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signaling mechanism such as an Event.

I think that creating daemonic threads depends of your application, but in general (and in my opinion) it's better to avoid killing them or making them daemonic. In multiprocessing you can use is_alive() to check process status and "terminate" for finish them (Also you avoid GIL problems). But you can find more problems, sometimes, when you execute your code in Windows.

And always remember that if you have "live threads", the Python interpreter will be running for wait them. (Because of this daemonic can help you if don't matter abruptly ends).

此刻的回忆 2025-01-31 20:22:46

有一个为此目的构建的库, stopit 。尽管此处列出的一些警告仍然适用,但至少该库提供了一种实现既定目标的常规,可重复的技术。

There is a library built for this purpose, stopit. Although some of the same cautions listed herein still apply, at least this library presents a regular, repeatable technique for achieving the stated goal.

握住你手 2025-01-31 20:22:46

只是为了建立 @SCB的想法(这正是我所需的)来创建一个具有自定义功能的KillableThread子类:

from threading import Thread, Event

class KillableThread(Thread):
    def __init__(self, sleep_interval=1, target=None, name=None, args=(), kwargs={}):
        super().__init__(None, target, name, args, kwargs)
        self._kill = Event()
        self._interval = sleep_interval
        print(self._target)

    def run(self):
        while True:
            # Call custom function with arguments
            self._target(*self._args)

            # If no kill signal is set, sleep for the interval,
            # If kill signal comes in while sleeping, immediately
            #  wake up and handle
            is_killed = self._kill.wait(self._interval)
            if is_killed:
                break

        print("Killing Thread")

    def kill(self):
        self._kill.set()

if __name__ == '__main__':

    def print_msg(msg):
        print(msg)

    t = KillableThread(10, print_msg, args=("hello world"))
    t.start()
    time.sleep(6)
    print("About to kill thread")
    t.kill()

自然,就像@SBC一样,该线程不会等待运行新的循环以停止。在此示例中,您会看到“即将杀死线程”后立即打印的“杀死线程”消息,而不是再等4秒钟才能完成该线程(因为我们已经睡了6秒钟)。

KillableThread构造函数中的第二个参数是您的自定义函数(在此处print_msg)。 ARGS参数是在此处调用函数((“ Hello World”))时将使用的参数。

Just to build up on @SCB's idea (which was exactly what I needed) to create a KillableThread subclass with a customized function:

from threading import Thread, Event

class KillableThread(Thread):
    def __init__(self, sleep_interval=1, target=None, name=None, args=(), kwargs={}):
        super().__init__(None, target, name, args, kwargs)
        self._kill = Event()
        self._interval = sleep_interval
        print(self._target)

    def run(self):
        while True:
            # Call custom function with arguments
            self._target(*self._args)

            # If no kill signal is set, sleep for the interval,
            # If kill signal comes in while sleeping, immediately
            #  wake up and handle
            is_killed = self._kill.wait(self._interval)
            if is_killed:
                break

        print("Killing Thread")

    def kill(self):
        self._kill.set()

if __name__ == '__main__':

    def print_msg(msg):
        print(msg)

    t = KillableThread(10, print_msg, args=("hello world"))
    t.start()
    time.sleep(6)
    print("About to kill thread")
    t.kill()

Naturally, like with @SBC, the thread doesn't wait to run a new loop to stop. In this example, you would see the "Killing Thread" message printed right after the "About to kill thread" instead of waiting for 4 more seconds for the thread to complete (since we have slept for 6 seconds already).

Second argument in KillableThread constructor is your custom function (print_msg here). Args argument are the arguments that will be used when calling the function (("hello world")) here.

情独悲 2025-01-31 20:22:46

An alternative is to use signal.pthread_kill< /a>发送停止信号。

from signal import pthread_kill, SIGTSTP
from threading import Thread
from itertools import count
from time import sleep

def target():
    for num in count():
        print(num)
        sleep(1)

thread = Thread(target=target)
thread.start()
sleep(5)
pthread_kill(thread.ident, SIGTSTP)

结果

0
1
2
3
4

[14]+  Stopped

An alternative is to use signal.pthread_kill to send a stop signal.

from signal import pthread_kill, SIGTSTP
from threading import Thread
from itertools import count
from time import sleep

def target():
    for num in count():
        print(num)
        sleep(1)

thread = Thread(target=target)
thread.start()
sleep(5)
pthread_kill(thread.ident, SIGTSTP)

result

0
1
2
3
4

[14]+  Stopped
锦上情书 2025-01-31 20:22:46

虽然很旧,但这对于某些方案可能是一种方便的解决方案:

一个扩展线程模块功能的小模块 -
允许一个线程在另一个上下文中提出异常
线。通过提高 SystemExit ,您最终可以杀死Python线程。

import threading
import ctypes     

def _async_raise(tid, excobj):
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble, 
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")

class Thread(threading.Thread):
    def raise_exc(self, excobj):
        assert self.isAlive(), "thread must be started"
        for tid, tobj in threading._active.items():
            if tobj is self:
                _async_raise(tid, excobj)
                return

        # the thread was alive when we entered the loop, but was not found 
        # in the dict, hence it must have been already terminated. should we raise
        # an exception here? silently ignore?

    def terminate(self):
        # must raise the SystemExit type, instead of a SystemExit() instance
        # due to a bug in PyThreadState_SetAsyncExc
        self.raise_exc(SystemExit)

因此,它允许“线程在另一个线程的上下文中提高异常”,通过这种方式,终止线程可以处理终止,而无需定期检查中止标志。

但是,根据其原始源,此代码存在一些问题。

  • 仅在执行Python字节码时才提出异常。如果您的线程调用本机/内置阻止功能,则
    只有在执行返回python时才会提出例外
    代码。

    • 如果内置函数内部调用pyerr_clear(),也有一个问题,该功能将有效地取消您的待处理异常。
      您可以尝试再次提高它。
  • 只能安全地提出异常类型。例外情况可能会导致意外行为,因此受到限制。
  • 我要求在内置线程模块中公开此功能,但是由于CTYPES已成为标准库(截至2.5),并且此功能已成为该功能
    功能不太可能是实现的,可以保留
    未暴露。

While it's rather old, this might be a handy solution for some:

A little module that extends the threading's module functionality --
allows one thread to raise exceptions in the context of another
thread. By raising SystemExit, you can finally kill python threads.

import threading
import ctypes     

def _async_raise(tid, excobj):
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble, 
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")

class Thread(threading.Thread):
    def raise_exc(self, excobj):
        assert self.isAlive(), "thread must be started"
        for tid, tobj in threading._active.items():
            if tobj is self:
                _async_raise(tid, excobj)
                return

        # the thread was alive when we entered the loop, but was not found 
        # in the dict, hence it must have been already terminated. should we raise
        # an exception here? silently ignore?

    def terminate(self):
        # must raise the SystemExit type, instead of a SystemExit() instance
        # due to a bug in PyThreadState_SetAsyncExc
        self.raise_exc(SystemExit)

So, it allows a "thread to raise exceptions in the context of another thread" and in this way, the terminated thread can handle the termination without regularly checking an abort flag.

However, according to its original source, there are some issues with this code.

  • The exception will be raised only when executing python bytecode. If your thread calls a native/built-in blocking function, the
    exception will be raised only when execution returns to the python
    code.

    • There is also an issue if the built-in function internally calls PyErr_Clear(), which would effectively cancel your pending exception.
      You can try to raise it again.
  • Only exception types can be raised safely. Exception instances are likely to cause unexpected behavior, and are thus restricted.
  • I asked to expose this function in the built-in thread module, but since ctypes has become a standard library (as of 2.5), and this
    feature is not likely to be implementation-agnostic, it may be kept
    unexposed.
守望孤独 2025-01-31 20:22:46

令人震惊的是,您想要具有相同函数的多个线程,这是恕我直言,最简单的实现来阻止一个iD:

import time
from threading import Thread

def doit(id=0):
    doit.stop=0
    print("start id:%d"%id)
    while 1:
        time.sleep(1)
        print(".")
        if doit.stop==id:
            doit.stop=0
            break
    print("end thread %d"%id)

t5=Thread(target=doit, args=(5,))
t6=Thread(target=doit, args=(6,))

t5.start() ; t6.start()
time.sleep(2)
doit.stop =5  #kill t5
time.sleep(2)
doit.stop =6  #kill t6

好的事情在这里,您可以具有相同的多个功能,并通过 停止它们。 functionName.stop

如果只想拥有一个函数的一个线程,则不需要记住ID。如果 doit.stop &gt; 0。

Asuming, that you want to have multiple threads of the same function, this is IMHO the easiest implementation to stop one by id:

import time
from threading import Thread

def doit(id=0):
    doit.stop=0
    print("start id:%d"%id)
    while 1:
        time.sleep(1)
        print(".")
        if doit.stop==id:
            doit.stop=0
            break
    print("end thread %d"%id)

t5=Thread(target=doit, args=(5,))
t6=Thread(target=doit, args=(6,))

t5.start() ; t6.start()
time.sleep(2)
doit.stop =5  #kill t5
time.sleep(2)
doit.stop =6  #kill t6

The nice thing is here, you can have multiple of same and different functions, and stop them all by functionname.stop

If you want to have only one thread of the function then you don't need to remember the id. Just stop, if doit.stop > 0.

檐上三寸雪 2025-01-31 20:22:46

如 @kozyarchuk的答案跟踪起作用。由于此答案不包含代码,因此这是一个工作现成的示例:

import sys, threading, time 

class TraceThread(threading.Thread): 
    def __init__(self, *args, **keywords): 
        threading.Thread.__init__(self, *args, **keywords) 
        self.killed = False
    def start(self): 
        self._run = self.run 
        self.run = self.settrace_and_run
        threading.Thread.start(self) 
    def settrace_and_run(self): 
        sys.settrace(self.globaltrace) 
        self._run()
    def globaltrace(self, frame, event, arg): 
        return self.localtrace if event == 'call' else None
    def localtrace(self, frame, event, arg): 
        if self.killed and event == 'line': 
            raise SystemExit() 
        return self.localtrace 

def f(): 
    while True: 
        print('1') 
        time.sleep(2)
        print('2') 
        time.sleep(2)
        print('3') 
        time.sleep(2)

t = TraceThread(target=f) 
t.start() 
time.sleep(2.5) 
t.killed = True

它在打印 1 2 后停止。 3 未打印。

As mentioned in @Kozyarchuk's answer, installing trace works. Since this answer contained no code, here is a working ready-to-use example:

import sys, threading, time 

class TraceThread(threading.Thread): 
    def __init__(self, *args, **keywords): 
        threading.Thread.__init__(self, *args, **keywords) 
        self.killed = False
    def start(self): 
        self._run = self.run 
        self.run = self.settrace_and_run
        threading.Thread.start(self) 
    def settrace_and_run(self): 
        sys.settrace(self.globaltrace) 
        self._run()
    def globaltrace(self, frame, event, arg): 
        return self.localtrace if event == 'call' else None
    def localtrace(self, frame, event, arg): 
        if self.killed and event == 'line': 
            raise SystemExit() 
        return self.localtrace 

def f(): 
    while True: 
        print('1') 
        time.sleep(2)
        print('2') 
        time.sleep(2)
        print('3') 
        time.sleep(2)

t = TraceThread(target=f) 
t.start() 
time.sleep(2.5) 
t.killed = True

It stops after having printed 1 and 2. 3 is not printed.

蓝眸 2025-01-31 20:22:46

Python版本:3.8

使用守护程序线程执行我们想要的内容,如果我们要终止守护程序线程,我们需要的只是使父螺纹退出,那么系统将终止守护程序线程,而守护程序则创建了parent thread。

还支持Coroutine和Coroutine功能。

def main():
    start_time = time.perf_counter()
    t1 = ExitThread(time.sleep, (10,), debug=False)
    t1.start()
    time.sleep(0.5)
    t1.exit()
    try:
        print(t1.result_future.result())
    except concurrent.futures.CancelledError:
        pass
    end_time = time.perf_counter()
    print(f"time cost {end_time - start_time:0.2f}")

以下是ExitThread源代码

import concurrent.futures
import threading
import typing
import asyncio


class _WorkItem(object):
    """ concurrent\futures\thread.py

    """

    def __init__(self, future, fn, args, kwargs, *, debug=None):
        self._debug = debug
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if self._debug:
            print("ExitThread._WorkItem run")
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            coroutine = None
            if asyncio.iscoroutinefunction(self.fn):
                coroutine = self.fn(*self.args, **self.kwargs)
            elif asyncio.iscoroutine(self.fn):
                coroutine = self.fn
            if coroutine is None:
                result = self.fn(*self.args, **self.kwargs)
            else:
                result = asyncio.run(coroutine)
            if self._debug:
                print("_WorkItem done")
        except BaseException as exc:
            self.future.set_exception(exc)
            # Break a reference cycle with the exception 'exc'
            self = None
        else:
            self.future.set_result(result)


class ExitThread:
    """ Like a stoppable thread

    Using coroutine for target then exit before running may cause RuntimeWarning.

    """

    def __init__(self, target: typing.Union[typing.Coroutine, typing.Callable] = None
                 , args=(), kwargs={}, *, daemon=None, debug=None):
        #
        self._debug = debug
        self._parent_thread = threading.Thread(target=self._parent_thread_run, name="ExitThread_parent_thread"
                                               , daemon=daemon)
        self._child_daemon_thread = None
        self.result_future = concurrent.futures.Future()
        self._workItem = _WorkItem(self.result_future, target, args, kwargs, debug=debug)
        self._parent_thread_exit_lock = threading.Lock()
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock_released = False  # When done it will be True
        self._started = False
        self._exited = False
        self.result_future.add_done_callback(self._release_parent_thread_exit_lock)

    def _parent_thread_run(self):
        self._child_daemon_thread = threading.Thread(target=self._child_daemon_thread_run
                                                     , name="ExitThread_child_daemon_thread"
                                                     , daemon=True)
        self._child_daemon_thread.start()
        # Block manager thread
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock.release()
        if self._debug:
            print("ExitThread._parent_thread_run exit")

    def _release_parent_thread_exit_lock(self, _future):
        if self._debug:
            print(f"ExitThread._release_parent_thread_exit_lock {self._parent_thread_exit_lock_released} {_future}")
        if not self._parent_thread_exit_lock_released:
            self._parent_thread_exit_lock_released = True
            self._parent_thread_exit_lock.release()

    def _child_daemon_thread_run(self):
        self._workItem.run()

    def start(self):
        if self._debug:
            print(f"ExitThread.start {self._started}")
        if not self._started:
            self._started = True
            self._parent_thread.start()

    def exit(self):
        if self._debug:
            print(f"ExitThread.exit exited: {self._exited} lock_released: {self._parent_thread_exit_lock_released}")
        if self._parent_thread_exit_lock_released:
            return
        if not self._exited:
            self._exited = True
            if not self.result_future.cancel():
                if self.result_future.running():
                    self.result_future.set_exception(concurrent.futures.CancelledError())

Python version: 3.8

Using daemon thread to execute what we wanted, if we want to daemon thread be terminated, all we need is making parent thread exit, then system will terminate daemon thread which parent thread created.

Also support coroutine and coroutine function.

def main():
    start_time = time.perf_counter()
    t1 = ExitThread(time.sleep, (10,), debug=False)
    t1.start()
    time.sleep(0.5)
    t1.exit()
    try:
        print(t1.result_future.result())
    except concurrent.futures.CancelledError:
        pass
    end_time = time.perf_counter()
    print(f"time cost {end_time - start_time:0.2f}")

below is ExitThread source code

import concurrent.futures
import threading
import typing
import asyncio


class _WorkItem(object):
    """ concurrent\futures\thread.py

    """

    def __init__(self, future, fn, args, kwargs, *, debug=None):
        self._debug = debug
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if self._debug:
            print("ExitThread._WorkItem run")
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            coroutine = None
            if asyncio.iscoroutinefunction(self.fn):
                coroutine = self.fn(*self.args, **self.kwargs)
            elif asyncio.iscoroutine(self.fn):
                coroutine = self.fn
            if coroutine is None:
                result = self.fn(*self.args, **self.kwargs)
            else:
                result = asyncio.run(coroutine)
            if self._debug:
                print("_WorkItem done")
        except BaseException as exc:
            self.future.set_exception(exc)
            # Break a reference cycle with the exception 'exc'
            self = None
        else:
            self.future.set_result(result)


class ExitThread:
    """ Like a stoppable thread

    Using coroutine for target then exit before running may cause RuntimeWarning.

    """

    def __init__(self, target: typing.Union[typing.Coroutine, typing.Callable] = None
                 , args=(), kwargs={}, *, daemon=None, debug=None):
        #
        self._debug = debug
        self._parent_thread = threading.Thread(target=self._parent_thread_run, name="ExitThread_parent_thread"
                                               , daemon=daemon)
        self._child_daemon_thread = None
        self.result_future = concurrent.futures.Future()
        self._workItem = _WorkItem(self.result_future, target, args, kwargs, debug=debug)
        self._parent_thread_exit_lock = threading.Lock()
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock_released = False  # When done it will be True
        self._started = False
        self._exited = False
        self.result_future.add_done_callback(self._release_parent_thread_exit_lock)

    def _parent_thread_run(self):
        self._child_daemon_thread = threading.Thread(target=self._child_daemon_thread_run
                                                     , name="ExitThread_child_daemon_thread"
                                                     , daemon=True)
        self._child_daemon_thread.start()
        # Block manager thread
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock.release()
        if self._debug:
            print("ExitThread._parent_thread_run exit")

    def _release_parent_thread_exit_lock(self, _future):
        if self._debug:
            print(f"ExitThread._release_parent_thread_exit_lock {self._parent_thread_exit_lock_released} {_future}")
        if not self._parent_thread_exit_lock_released:
            self._parent_thread_exit_lock_released = True
            self._parent_thread_exit_lock.release()

    def _child_daemon_thread_run(self):
        self._workItem.run()

    def start(self):
        if self._debug:
            print(f"ExitThread.start {self._started}")
        if not self._started:
            self._started = True
            self._parent_thread.start()

    def exit(self):
        if self._debug:
            print(f"ExitThread.exit exited: {self._exited} lock_released: {self._parent_thread_exit_lock_released}")
        if self._parent_thread_exit_lock_released:
            return
        if not self._exited:
            self._exited = True
            if not self.result_future.cancel():
                if self.result_future.running():
                    self.result_future.set_exception(concurrent.futures.CancelledError())
野稚 2025-01-31 20:22:46

如果您确实需要杀死子任务的能力,请使用替代实现。 多处理 gevent 均支持杀死“线程”。

Python的线程不支持取消。甚至不要尝试。您的代码很可能发生死锁,损坏或泄漏记忆,或者具有很少和非确定性发生的其他意想不到的“有趣”的难以删除的效果。

If you really need the ability to kill a sub-task, use an alternate implementation. multiprocessing and gevent both support indiscriminately killing a "thread".

Python's threading does not support cancellation. Do not even try. Your code is very likely to deadlock, corrupt or leak memory, or have other unintended "interesting" hard-to-debug effects which happen rarely and nondeterministically.

焚却相思 2025-01-31 20:22:46

PIETER HINTJENS- ØMQ -project--避免使用锁,静音,事件等诸如锁定的原始词,这是编写多线程程序的清晰和最新方法:

http://zguide.zeromq.org/py:all#multithreading-with-zeromq

这包括告诉孩子线程,它应该取消其工作。这是通过将线程配备Ømq插座和在该插座上进行轮询的消息来完成的,以获取一条消息,说它应该取消。

该链接还提供了带有Ømq的多线程Python代码的示例。

Pieter Hintjens -- one of the founders of the ØMQ-project -- says, using ØMQ and avoiding synchronization primitives like locks, mutexes, events etc., is the sanest and securest way to write multi-threaded programs:

http://zguide.zeromq.org/py:all#Multithreading-with-ZeroMQ

This includes telling a child thread, that it should cancel its work. This would be done by equipping the thread with a ØMQ-socket and polling on that socket for a message saying that it should cancel.

The link also provides an example on multi-threaded python code with ØMQ.

半衾梦 2025-01-31 20:22:46

这似乎与Windows 7上的Pywin32一起使用

my_thread = threading.Thread()
my_thread.start()
my_thread._Thread__stop()

This seems to work with pywin32 on windows 7

my_thread = threading.Thread()
my_thread.start()
my_thread._Thread__stop()
疯了 2025-01-31 20:22:46

您可以在过程中执行命令,然后使用进程ID杀死它。
我需要在两个线程之间同步,其中一个线程不会单独返回。

processIds = []

def executeRecord(command):
    print(command)

    process = subprocess.Popen(command, stdout=subprocess.PIPE)
    processIds.append(process.pid)
    print(processIds[0])

    #Command that doesn't return by itself
    process.stdout.read().decode("utf-8")
    return;


def recordThread(command, timeOut):

    thread = Thread(target=executeRecord, args=(command,))
    thread.start()
    thread.join(timeOut)

    os.kill(processIds.pop(), signal.SIGINT)

    return;

You can execute your command in a process and then kill it using the process id.
I needed to sync between two thread one of which doesn’t return by itself.

processIds = []

def executeRecord(command):
    print(command)

    process = subprocess.Popen(command, stdout=subprocess.PIPE)
    processIds.append(process.pid)
    print(processIds[0])

    #Command that doesn't return by itself
    process.stdout.read().decode("utf-8")
    return;


def recordThread(command, timeOut):

    thread = Thread(target=executeRecord, args=(command,))
    thread.start()
    thread.join(timeOut)

    os.kill(processIds.pop(), signal.SIGINT)

    return;
终难遇 2025-01-31 20:22:46

这是一个不好的答案,请参阅评论

这是做到这一点的方法:

from threading import *

...

for thread in enumerate():
    if thread.isAlive():
        try:
            thread._Thread__stop()
        except:
            print(str(thread.getName()) + ' could not be terminated'))

给它几秒钟,然后您的线程应停止。还检查 thread._thread__delete()方法。

为了方便起见,我将建议使用 thread.quit()方法。例如,如果线程中有一个套接字,我建议您在套接字处理类中创建一个 Quit()方法,终止套接字,然后运行 thread._thread__stop() 在您的 Quit()的内部。

This is a bad answer, see the comments

Here's how to do it:

from threading import *

...

for thread in enumerate():
    if thread.isAlive():
        try:
            thread._Thread__stop()
        except:
            print(str(thread.getName()) + ' could not be terminated'))

Give it a few seconds then your thread should be stopped. Check also the thread._Thread__delete() method.

I'd recommend a thread.quit() method for convenience. For example if you have a socket in your thread, I'd recommend creating a quit() method in your socket-handle class, terminate the socket, then run a thread._Thread__stop() inside of your quit().

时光无声 2025-01-31 20:22:46

使用setDaemon(true)启动子线程。

def bootstrap(_filename):
    mb = ModelBootstrap(filename=_filename) # Has many Daemon threads. All get stopped automatically when main thread is stopped.

t = threading.Thread(target=bootstrap,args=('models.conf',))
t.setDaemon(False)

while True:
    t.start()
    time.sleep(10) # I am just allowing the sub-thread to run for 10 sec. You can listen on an event to stop execution.
    print('Thread stopped')
    break

Start the sub thread with setDaemon(True).

def bootstrap(_filename):
    mb = ModelBootstrap(filename=_filename) # Has many Daemon threads. All get stopped automatically when main thread is stopped.

t = threading.Thread(target=bootstrap,args=('models.conf',))
t.setDaemon(False)

while True:
    t.start()
    time.sleep(10) # I am just allowing the sub-thread to run for 10 sec. You can listen on an event to stop execution.
    print('Thread stopped')
    break
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文