在 Python 中处理阻塞函数调用

发布于 2024-09-29 22:25:58 字数 1477 浏览 5 评论 0原文

我正在使用 Gnuradio 框架。我处理生成的流程图来发送/接收信号。这些流程图初始化并启动,但它们不会将控制流返回到我的应用程序:

我导入了 time

while time.time() < endtime:
        # invoke GRC flowgraph for 1st sequence
        if not seq1_sent:
            tb = send_seq_2.top_block()
            tb.Run(True)
            seq1_sent = True
            if time.time() < endtime:
                break

        # invoke GRC flowgraph for 2nd sequence
        if not seq2_sent:
            tb = send_seq_2.top_block()
            tb.Run(True)
            seq2_sent = True
            if time.time() < endtime:
                break

问题是:只有第一个 if 语句调用流程图(与硬件交互) 。我被困在这了。我可以使用线程,但我对如何在 Python 中超时线程没有经验。我怀疑这是否可能,因为 API 中似乎没有杀死线程。该脚本只能在 Linux 上运行...

如何使用 Python 正确处理阻塞函数 - 而不杀死整个程序。 这个问题的另一个更具体的例子是:

import signal, os

def handler(signum, frame):
        # print 'Signal handler called with signal', signum
        #raise IOError("Couldn't open device!")
        import time
        print "wait"
        time.sleep(3)


def foo():
    # Set the signal handler and a 5-second alarm
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(3)

    # This open() may hang indefinitely
    fd = os.open('/dev/ttys0', os.O_RDWR)
    signal.alarm(0)          # Disable the alarm


foo()
print "hallo"

How do I still get print "hallo". ;)

谢谢, 马吕斯

I'm working with the Gnuradio framework. I handle flowgraphs I generate to send/receive signals. These flowgraphs initialize and start, but they don't return the control flow to my application:

I imported time

while time.time() < endtime:
        # invoke GRC flowgraph for 1st sequence
        if not seq1_sent:
            tb = send_seq_2.top_block()
            tb.Run(True)
            seq1_sent = True
            if time.time() < endtime:
                break

        # invoke GRC flowgraph for 2nd sequence
        if not seq2_sent:
            tb = send_seq_2.top_block()
            tb.Run(True)
            seq2_sent = True
            if time.time() < endtime:
                break

The problem is: only the first if statement invokes the flow-graph (that interacts with the hardware). I'm stuck in this. I could use a Thread, but I'm unexperienced how to timeout threads in Python. I doubt that this is possible, because it seems killing threads isn't within the APIs. This script only has to work on Linux...

How do you handle blocking functions with Python properly - without killing the whole program.
Another more concrete example for this problem is:

import signal, os

def handler(signum, frame):
        # print 'Signal handler called with signal', signum
        #raise IOError("Couldn't open device!")
        import time
        print "wait"
        time.sleep(3)


def foo():
    # Set the signal handler and a 5-second alarm
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(3)

    # This open() may hang indefinitely
    fd = os.open('/dev/ttys0', os.O_RDWR)
    signal.alarm(0)          # Disable the alarm


foo()
print "hallo"

How do I still get print "hallo". ;)

Thanks,
Marius

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(8

凝望流年 2024-10-06 22:25:58

首先 - 应不惜一切代价避免使用信号:

1)它可能导致死锁。 SIGALRM 可能会在阻塞系统调用之前到达进程(想象一下系统中的超高负载!)并且系统调用不会被中断。僵局。

2) 玩弄信号可能会产生一些令人讨厌的非本地后果。例如,其他线程中的系统调用可能会被中断,这通常不是您想要的。通常,当收到(不是致命的)信号时,系统调用会重新启动。当您设置信号处理程序时,它会自动关闭整个进程或线程组的此行为。检查“man siginterrupt”。

相信我 - 我以前遇到过两个问题,而且它们一点也不有趣。

在某些情况下,可以明确避免阻塞 - 我强烈建议使用 select() 和朋友(检查 Python 中的 select 模块)来处理阻塞写入和读取。但这并不能解决阻塞 open() 调用的问题。

为此,我测试了这个解决方案,它对于命名管道效果很好。它以非阻塞方式打开,然后将其关闭并使用 select() 调用,如果没有可用的内容,最终会超时。

import sys, os, select, fcntl

f = os.open(sys.argv[1], os.O_RDONLY | os.O_NONBLOCK)

flags = fcntl.fcntl(f, fcntl.F_GETFL, 0)
fcntl.fcntl(f, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)

r, w, e = select.select([f], [], [], 2.0)

if r == [f]:
    print 'ready'
    print os.read(f, 100)
else:
    print 'unready'

os.close(f)

测试一下:

mkfifo /tmp/fifo
python <code_above.py> /tmp/fifo (1st terminal)
echo abcd > /tmp/fifo (2nd terminal)

通过一些额外的努力, select() 调用可以用作整个程序的主循环,聚合所有事件 - 您可以使用 libev 或 libevent,或围绕它们的一些 Python 包装器。

当您无法明确强制非阻塞行为时,假设您只使用外部库,那么它会变得更加困难。线程也许可以,但显然它不是最先进的解决方案,通常是错误的。

恐怕一般来说你无法以稳健的方式解决这个问题 - 这实际上取决于你阻止的内容。

First of all - the use of signals should be avoided at all cost:

1) It may lead to a deadlock. SIGALRM may reach the process BEFORE the blocking syscall (imagine super-high load in the system!) and the syscall will not be interrupted. Deadlock.

2) Playing with signals may have some nasty non-local consequences. For example, syscalls in other threads may be interrupted which usually is not what you want. Normally syscalls are restarted when (not a deadly) signal is received. When you set up a signal handler it automatically turns off this behavior for the whole process, or thread group so to say. Check 'man siginterrupt' on that.

Believe me - I met two problems before and they are not fun at all.

In some cases the blocking can be avoided explicitely - I strongly recommend using select() and friends (check select module in Python) to handle blocking writes and reads. This will not solve blocking open() call, though.

For that I've tested this solution and it works well for named pipes. It opens in a non-blocking way, then turns it off and uses select() call to eventually timeout if nothing is available.

import sys, os, select, fcntl

f = os.open(sys.argv[1], os.O_RDONLY | os.O_NONBLOCK)

flags = fcntl.fcntl(f, fcntl.F_GETFL, 0)
fcntl.fcntl(f, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)

r, w, e = select.select([f], [], [], 2.0)

if r == [f]:
    print 'ready'
    print os.read(f, 100)
else:
    print 'unready'

os.close(f)

Test this with:

mkfifo /tmp/fifo
python <code_above.py> /tmp/fifo (1st terminal)
echo abcd > /tmp/fifo (2nd terminal)

With some additional effort select() call can be used as a main loop of the whole program, aggregating all events - you can use libev or libevent, or some Python wrappers around them.

When you can't explicitely force non-blocking behavior, say you just use an external library, then it's going to be much harder. Threads may do, but obviously it is not a state-of-the-art solution, usually being just wrong.

I'm afraid that in general you can't solve this in a robust way - it really depends on WHAT you block.

Hello爱情风 2024-10-06 22:25:58

IIUC,每个top_block都有一个stop方法。因此,您实际上可以在线程中运行 top_block,并在超时到达时发出停止信号。如果 top_block 的 wait() 也有超时,那就更好了,但遗憾的是,它没有。

在主线程中,您需要等待两种情况:a)top_block 完成,b)超时到期。繁忙等待是邪恶的:-),因此您应该使用线程的 join-with-timeout 来等待线程。如果join后线程仍然存活,则需要停止top_run。

IIUC, each top_block has a stop method. So you actually can run the top_block in a thread, and issue a stop if the timeout has arrived. It would be better if the top_block's wait() also had a timeout, but alas, it doesn't.

In the main thread, you then need to wait for two cases: a) the top_block completes, and b) the timeout expires. Busy-waits are evil :-), so you should use the thread's join-with-timeout to wait for the thread. If the thread is still alive after the join, you need to stop the top_run.

不忘初心 2024-10-06 22:25:58

您可以设置一个信号警报,该警报会超时中断您的通话:

http://docs.python。 org/library/signal.html

signal.alarm(1) # 1 second

my_blocking_call()
signal.alarm(0)

如果您想确保它不会破坏您的应用程序,您还可以设置信号处理程序:

def my_handler(signum, frame):
    pass

signal.signal(signal.SIGALRM, my_handler)

编辑:
这段代码有什么问题?这不应该中止您的应用程序:

import signal, time

def handler(signum, frame):
    print "Timed-out"

def foo():
    # Set the signal handler and a 5-second alarm
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(3)

    # This open() may hang indefinitely
    time.sleep(5)
    signal.alarm(0)          # Disable the alarm


foo()
print "hallo"

事情是:

  1. SIGALRM 的默认处理程序是中止应用程序,如果您设置了处理程序,那么它不应该再停止应用程序。

  2. 接收信号通常会中断系统调用(然后解锁您的应用程序)

You can set a signal alarm that will interrupt your call with a timeout:

http://docs.python.org/library/signal.html

signal.alarm(1) # 1 second

my_blocking_call()
signal.alarm(0)

You can also set a signal handler if you want to make sure it won't destroy your application:

def my_handler(signum, frame):
    pass

signal.signal(signal.SIGALRM, my_handler)

EDIT:
What's wrong with this piece of code ? This should not abort your application:

import signal, time

def handler(signum, frame):
    print "Timed-out"

def foo():
    # Set the signal handler and a 5-second alarm
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(3)

    # This open() may hang indefinitely
    time.sleep(5)
    signal.alarm(0)          # Disable the alarm


foo()
print "hallo"

The thing is:

  1. The default handler for SIGALRM is to abort the application, if you set your handler then it should no longer stop the application.

  2. Receiving a signal usually interrupts system calls (then unblocks your application)

枕梦 2024-10-06 22:25:58
if not seq1_sent:
        tb = send_seq_2.top_block()
        tb.Run(True)
        seq1_sent = True
        if time.time() < endtime:
            break

如果 'if time.time() < endtime:' 那么你将跳出循环,并且 seq2_sent 的东西将永远不会被命中,也许你的意思是 'time.time() >那个测试中的“末日”?

if not seq1_sent:
        tb = send_seq_2.top_block()
        tb.Run(True)
        seq1_sent = True
        if time.time() < endtime:
            break

If the 'if time.time() < endtime:' then you will break out of the loop and the seq2_sent stuff will never be hit, maybe you mean 'time.time() > endtime' in that test?

诠释孤独 2024-10-06 22:25:58

您问题的简单部分与信号处理有关。从 Python 运行时的角度来看,解释器进行系统调用时收到的信号会作为 OSError 异常呈现给您的 Python 代码,并带有对应于 errno.EINTR 的 errno 属性。

因此,这可能大致按照您的预期工作:

    #!/usr/bin/env python
    import signal, os, errno, time

    def handler(signum, frame):
            # print 'Signal handler called with signal', signum
            #raise IOError("Couldn't open device!")
            print "timed out"
            time.sleep(3)


    def foo():
        # Set the signal handler and a 5-second alarm
        signal.signal(signal.SIGALRM, handler)

        try:
            signal.alarm(3)
            # This open() may hang indefinitely
            fd = os.open('/dev/ttys0', os.O_RDWR)
        except OSError, e:
            if e.errno != errno.EINTR:
                raise e
        signal.alarm(0)          # Disable the alarm

    foo()
    print "hallo"

请注意,我已将 time 的导入移出了函数定义,因为以这种方式隐藏导入似乎是一种糟糕的形式。我根本不清楚为什么你要在信号处理程序中睡觉,事实上,这似乎是一个相当糟糕的主意。

我想要指出的关键点是,任何(不可忽略的)信号都会中断 Python 代码执行的主线。您的处理程序将使用指示哪个信号号触发执行的参数(允许使用一个 Python 函数来处理许多不同的信号)和一个框架对象(可用于某种类型的调试或检测)来调用。

由于代码的主要流程被中断,因此您有必要将该代码包装在某些异常处理中,以便在发生此类事件后重新获得控制权。 (顺便说一句,如果您用 C 语言编写代码,您也会有同样的担忧;您必须为具有底层系统调用的任何库函数做好准备,以返回错误并处理系统中的 -EINTR errno 通过循环返回重试或分支到主线中的某个替代方案(例如继续处理其他文件,或没有任何文件/输入等),

正如其他人在回答您的问题时所指出的那样,将您的方法基于 SIGALARM。更糟糕的是,其中一些问题可能是您在测试环境中永远不会遇到的竞争条件,并且可能仅在极难重现的条件下发生。在重入的情况下 --- 如果在信号处理程序执行期间发送信号会发生什么?

我在某些脚本中使用了 SIGALARM,这对我来说不是问题,在 Linux 下我正在处理的代码。适合该任务可能足以满足您的需求。

如果不了解更多有关 Gnuradio 代码的行为方式、从中实例化什么类型的对象以及它们返回什么类型的对象的信息,您的主要问题很难回答。

浏览一下您链接到的文档,我发现它们似乎没有提供任何可用于直接限制阻止行为的“超时”参数或设置。在“控制流图”下的表格中,我看到他们特别指出 .run() 可以无限期执行或直到收到 SIGINT 为止。我还注意到 .start() 可以启动应用程序中的线程,并且在这些线程运行时将控制权返回给 Python 代码行。 (这似乎取决于流程图的性质,我对此还不够理解)。

听起来您可以创建流程图 .start() 它们,然后(经过一段时间的处理或在 Python 代码主线中休眠后)调用 .lock()< /code> 控制对象上的方法(tb?)。我猜测,这会将状态的 Python 表示形式……Python 对象……置于静态模式,以允许您查询状态,或者正如他们所说,重新配置您的流程图。如果你调用.run(),它会在调用.start()后调用.wait();和 .wait() 显然会运行,直到所有块“表明它们已完成”或直到您调用对象的 .stop() 方法。

所以听起来你想使用 .start() 而既不是 .run() 也不是 .wait();然后在执行任何其他处理(包括 time.sleep())后调用 .stop()

也许像这样简单:

    tb = send_seq_2.top_block()
    tb.start()
    time.sleep(endtime - time.time())
    tb.stop()
    seq1_sent = True
    tb = send_seq_2.top_block()
    tb.start()
    seq2_sent = True

..尽管我对我的 time.sleep() 持怀疑态度。也许您想做一些其他事情来查询 tb 对象的状态(可能需要休眠更短的时间间隔,调用其 .lock() 方法,并访问我知道的属性没有关于然后在再次睡眠之前调用它的 .unlock()

The easy part of your question relates to the signal handling. From the perspective of the Python runtime a signal which has been received while the interpreter was making a system call is presented to your Python code as an OSError exception with an errno attributed corresponding to errno.EINTR

So this probably works roughly as you intended:

    #!/usr/bin/env python
    import signal, os, errno, time

    def handler(signum, frame):
            # print 'Signal handler called with signal', signum
            #raise IOError("Couldn't open device!")
            print "timed out"
            time.sleep(3)


    def foo():
        # Set the signal handler and a 5-second alarm
        signal.signal(signal.SIGALRM, handler)

        try:
            signal.alarm(3)
            # This open() may hang indefinitely
            fd = os.open('/dev/ttys0', os.O_RDWR)
        except OSError, e:
            if e.errno != errno.EINTR:
                raise e
        signal.alarm(0)          # Disable the alarm

    foo()
    print "hallo"

Note I've moved the import of time out of the function definition as it seems to be poor form to hide imports in that way. It's not at all clear to me why you're sleeping in your signal handler and, in fact, it seems like a rather bad idea.

The key point I'm trying to make is that any (non-ignored) signal will interrupt your main line of Python code execution. Your handler will be invoked with arguments indicating which signal number triggered the execution (allowing for one Python function to be used for handling many different signals) and a frame object (which could be used for debugging or instrumentation of some sort).

Because the main flow through the code is interrupted it's necessary for you to wrap that code in some exception handling in order to regain control after such events have occurred. (Incidentally if you're writing code in C you'd have the same concern; you have to be prepared for any of your library functions with underlying system calls to return errors and handle -EINTR in the system errno by looping back to retry or branching to some alternative in your main line (such as proceeding to some other file, or without any file/input, etc).

As others have indicated in their responses to your question, basing your approach on SIGALARM is likely to be fraught with portability and reliability issues. Worse, some of these issues may be race conditions that you'll never encounter in your testing environment and may only occur under conditions that are extremely hard to reproduce. The ugly details tend to be in cases of re-entrancy --- what happens if signals are dispatched during execution of your signal handler?

I've used SIGALARM in some scripts and it hasn't been an issue for me, under Linux. The code I was working on was suitable to the task. It might be adequate for your needs.

Your primary question is difficult to answer without knowing more about how this Gnuradio code behaves, what sorts of objects you instantiate from it, and what sorts of objects they return.

Glancing at the docs to which you've linked, I see that they don't seem to offer any sort of "timeout" argument or setting that could be used to limit blocking behavior directly. In the table under "Controlling Flow Graphs" I see that they specifically say that .run() can execute indefinitely or until SIGINT is received. I also note that .start() can start threads in your application and, it seems, returns control to your Python code line while those are running. (That seems to depend on the nature of your flow graphs, which I don't understand sufficiently).

It sounds like you could create your flow graphs, .start() them, and then (after some time processing or sleeping in your main line of Python code) call the .lock() method on your controlling object (tb?). This, I'm guessing, puts the Python representation of the state ... the Python object ... into a quiescent mode to allow you to query the state or, as they say, reconfigure your flow graph. If you call .run() it will call .wait() after it calls .start(); and .wait() will apparently run until either all blocks "indicate they are done" or until you call the object's .stop() method.

So it sounds like you want to use .start() and neither .run() nor .wait(); then call .stop() after doing any other processing (including time.sleep()).

Perhaps something as simple as:

    tb = send_seq_2.top_block()
    tb.start()
    time.sleep(endtime - time.time())
    tb.stop()
    seq1_sent = True
    tb = send_seq_2.top_block()
    tb.start()
    seq2_sent = True

.. though I'm suspicious of my time.sleep() there. Perhaps you want to do something else where you query the tb object's state (perhaps entailing sleeping for smaller intervals, calling its .lock() method, and accessing attributes that I know nothing about and then calling its .unlock() before sleeping again.

美人如玉 2024-10-06 22:25:58

你可以尝试使用延迟执行... Twisted 框架大量使用它们

http://www6. uniovi.es/python/pycon/papers/deferex/

you could try using Deferred execution... Twisted framework uses them alot

http://www6.uniovi.es/python/pycon/papers/deferex/

一袭水袖舞倾城 2024-10-06 22:25:58

您提到在Python中杀死线程-这部分是可能的,尽管您只能在Python代码运行时杀死/中断另一个线程,而不是在C代码中,所以这可能无法如您所愿地帮助您。

请参阅另一个问题的答案:
python:如何在多线程中发送数据包,然后线程杀死自己

或谷歌可杀死的python线程以获取更多详细信息,如下所示:
http://code.activestate.com/recipes/496960-thread2-killable -线程/

You mention killing threads in Python - this is partialy possible although you can kill/interrupt another thread only when Python code runs, not in C code, so this may not help you as you want.

see this answer to another question:
python: how to send packets in multi thread and then the thread kill itself

or google for killable python threads for more details like this:
http://code.activestate.com/recipes/496960-thread2-killable-threads/

走走停停 2024-10-06 22:25:58

如果要在阻塞函数上设置超时,可以使用 threading.Thread 作为 join(timeout) 方法,该方法会阻塞直到超时。

基本上,类似的事情应该做你想做的:

import threading
my_thread = threading.Thread(target=send_seq_2.top_block)
my_thread.start()
my_thread.join(TIMEOUT)

If you want to set a timeout on a blocking function, threading.Thread as the method join(timeout) which blocks until the timeout.

Basically, something like that should do what you want :

import threading
my_thread = threading.Thread(target=send_seq_2.top_block)
my_thread.start()
my_thread.join(TIMEOUT)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文