gobject io 监控 +非阻塞读取
我在 python 中使用 io_add_watch 监视器(通过 gobject)时遇到问题。我想在每次通知后对整个缓冲区进行非阻塞读取。这是代码(缩短了一点):
class SomeApp(object):
def __init__(self):
# some other init that does a lot of stderr debug writes
fl = fcntl.fcntl(0, fcntl.F_GETFL, 0)
fcntl.fcntl(0, fcntl.F_SETFL, fl | os.O_NONBLOCK)
print "hooked", gobject.io_add_watch(0, gobject.IO_IN | gobject.IO_PRI, self.got_message, [""])
self.app = gobject.MainLoop()
def run(self):
print "ready"
self.app.run()
def got_message(self, fd, condition, data):
print "reading now"
data[0] += os.read(0, 1024)
print "got something", fd, condition, data
return True
gobject.threads_init()
SomeApp().run()
这是技巧 - 当我在没有激活调试输出的情况下运行程序时,我不会收到 got_message
调用。当我首先向 stderr 写入很多内容时,问题就消失了。如果我除了这段代码中可见的打印之外不写任何东西,我就不会收到标准输入消息信号。另一个有趣的事情是,当我尝试在启用 stderr 调试的情况下运行同一个应用程序但通过 strace (检查是否有我错过的 fcntl / ioctl 调用)时,问题再次出现。
简而言之:如果我首先在没有 strace 的情况下向 stderr 写入很多内容,则 io_watch 可以工作。如果我用 strace 写了很多,或者根本不写,io_watch
不起作用。
“some other init”部分需要一些时间,因此如果我在看到“hooked 2”输出之前输入一些文本,然后在“ready”后按“ctrl+c”,则会调用 get_message
回调,但是 read 调用会抛出 EAGAIN,因此缓冲区似乎是空的。
与标准输入相关的 Strace 日志:
ioctl(0, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo ...}) = 0
ioctl(0, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo ...}) = 0
fcntl(0, F_GETFL) = 0xa002 (flags O_RDWR|O_ASYNC|O_LARGEFILE)
fcntl(0, F_SETFL, O_RDWR|O_NONBLOCK|O_ASYNC|O_LARGEFILE) = 0
fcntl(0, F_GETFL) = 0xa802 (flags O_RDWR|O_NONBLOCK|O_ASYNC|O_LARGEFILE)
有人对这里发生的事情有一些想法吗?
编辑:另一个线索。我尝试重构应用程序以在不同的线程中进行读取并通过管道将其传回。它“有点”有效:
...
rpipe, wpipe = os.pipe()
stopped = threading.Event()
self.stdreader = threading.Thread(name = "reader", target = self.std_read_loop, args = (wpipe, stopped))
self.stdreader.start()
new_data = ""
print "hooked", gobject.io_add_watch(rpipe, gobject.IO_IN | gobject.IO_PRI, self.got_message, [new_data])
def std_read_loop(self, wpipe, stop_event):
while True:
try:
new_data = os.read(0, 1024)
while len(new_data) > 0:
l = os.write(wpipe, new_data)
new_data = new_data[l:]
except OSError, e:
if stop_event.isSet():
break
time.sleep(0.1)
...
令人惊讶的是,如果我将相同的文本放入新管道中,一切都会开始工作。问题是:
- 第一行根本没有“注意到” - 我只得到第二行和以下几行,
- 它很丑陋
也许这会给其他人一个关于为什么会发生这种情况的线索?
I've got a problem with using the io_add_watch
monitor in python (via gobject). I want to do a nonblocking read of the whole buffer after every notification. Here's the code (shortened a bit):
class SomeApp(object):
def __init__(self):
# some other init that does a lot of stderr debug writes
fl = fcntl.fcntl(0, fcntl.F_GETFL, 0)
fcntl.fcntl(0, fcntl.F_SETFL, fl | os.O_NONBLOCK)
print "hooked", gobject.io_add_watch(0, gobject.IO_IN | gobject.IO_PRI, self.got_message, [""])
self.app = gobject.MainLoop()
def run(self):
print "ready"
self.app.run()
def got_message(self, fd, condition, data):
print "reading now"
data[0] += os.read(0, 1024)
print "got something", fd, condition, data
return True
gobject.threads_init()
SomeApp().run()
Here's the trick - when I run the program without debug output activated, I don't get the got_message
calls. When I write a lot of stuff to the stderr first, the problem disappears. If I don't write anything apart from the prints visible in this code, I don't get the stdin messsage signals. Another interesting thing is that when I try to run the same app with stderr debug enabled but via strace
(to check if there are any fcntl / ioctl calls I missed), the problem appears again.
So in short: if I write a lot to stderr first without strace, io_watch
works. If I write a lot with strace, or don't write at all io_watch
doesn't work.
The "some other init" part takes some time, so if I type some text before I see "hooked 2" output and then press "ctrl+c" after "ready", the get_message
callback is called, but the read call throws EAGAIN, so the buffer seems to be empty.
Strace log related to the stdin:
ioctl(0, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo ...}) = 0
ioctl(0, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo ...}) = 0
fcntl(0, F_GETFL) = 0xa002 (flags O_RDWR|O_ASYNC|O_LARGEFILE)
fcntl(0, F_SETFL, O_RDWR|O_NONBLOCK|O_ASYNC|O_LARGEFILE) = 0
fcntl(0, F_GETFL) = 0xa802 (flags O_RDWR|O_NONBLOCK|O_ASYNC|O_LARGEFILE)
Does anyone have some ideas on what's going on here?
EDIT: Another clue. I tried to refactor the app to do the reading in a different thread and pass it back via a pipe. It "kind of" works:
...
rpipe, wpipe = os.pipe()
stopped = threading.Event()
self.stdreader = threading.Thread(name = "reader", target = self.std_read_loop, args = (wpipe, stopped))
self.stdreader.start()
new_data = ""
print "hooked", gobject.io_add_watch(rpipe, gobject.IO_IN | gobject.IO_PRI, self.got_message, [new_data])
def std_read_loop(self, wpipe, stop_event):
while True:
try:
new_data = os.read(0, 1024)
while len(new_data) > 0:
l = os.write(wpipe, new_data)
new_data = new_data[l:]
except OSError, e:
if stop_event.isSet():
break
time.sleep(0.1)
...
It's surprising that if I just put the same text in a new pipe, everything starts to work. The problem is that:
- the first line is not "noticed" at all - I get only the second and following lines
- it's fugly
Maybe that will give someone else a clue on why that's happening?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
这听起来像是一种竞争条件,其中设置回调有一些延迟,或者环境发生变化会影响您是否可以设置回调。
我会仔细查看在调用 io_add_watch() 之前发生的情况。例如 Python fcntl 文档说:
显然,当您假设 STDIN 将具有 FD == 0 时,这不是您正在做的事情。我会先更改它,然后再试一次。
另一件事是,如果 FD 已经被阻塞,那么当其他非阻塞进程正在运行时,您的进程可能会等待,因此,根据您首先执行的操作,会存在时间差异。如果您重构 fcntl 内容,使其在程序启动后立即完成,甚至在导入 GTK 模块之前完成,会发生什么?
我不确定我是否理解为什么使用 GTK GUI 的程序首先想要从标准输入中读取。如果您实际上正在尝试捕获另一个进程的输出,则应该使用 subprocess 模块设置一个管道,然后在管道上使用
io_add_watch()
,如下所示:同样,在本示例中,我们确保在调用 io_add_watch() 之前我们有一个有效的打开的 FD。
通常,当使用
gobject.io_add_watch()
时,它会在gobject.MainLoop()
之前调用。例如,下面是一些使用 io_add_watch 来捕获 IO_IN 的工作代码。This sounds like a race condition in which there is some delay to setting your callback, or else there is a change in the environment which affects whether or not you can set the callback.
I would look carefully at what happens before you call
io_add_watch()
. For instance the Python fcntl docs say:Clearly that is not what you are doing when you assume that STDIN will have FD == 0. I would change that first and try again.
The other thing is that if the FD is already blocked, then your process could be waiting while other non-blocked processes are running, therefore there is a timing difference depending on what you do first. What happens if you refactor the fcntl stuff so that it is done soon after the program starts, even before importing the GTK modules?
I'm not sure that I understand why a program using the GTK GUI would want to read from the standard input in the first place. If you are actually trying to capture the output of another process, you should use the subprocess module to set up a pipe, then
io_add_watch()
on the pipe like so:Again, in this example we make sure that we have a valid opened FD before calling
io_add_watch(
).Normally, when
gobject.io_add_watch()
is used, it is called just beforegobject.MainLoop()
. For example, here is some working code usingio_add_watch
to catch IO_IN.文档说你应该从回调中返回
TRUE
,否则将从事件源列表中删除。The documentation says you should return
TRUE
from the callback or it will be removed from the list of event sources.如果在任何 stderr 输出之前先挂接回调,会发生什么?当您启用调试输出时,它仍然会被调用吗?
另外,我想您可能应该在处理程序中重复调用 os.read() 直到它没有提供数据,以防调用之间超过 1024 字节准备就绪。
您是否尝试过在后台线程中使用
select
模块来模拟gio
功能?那有用吗?这是什么平台?您处理的是什么类型的 FD? (文件?套接字?管道?)What happens if you hook the callback first, prior to any stderr output? Does it still get called when you have debug output enabled?
Also, I suppose you should probably be repeatedly calling
os.read()
in your handler until it gives no data, in case >1024 bytes become ready between calls.Have you tried using the
select
module in a background thread to emulategio
functionality? Does that work? What platform is this and what kind of FD are you dealing with? (file? socket? pipe?)