怀疑 pyinotify 的线程问题

发布于 2024-12-04 06:31:12 字数 8148 浏览 1 评论 0原文

我一直在使用 pyinotify,但遇到了问题,在对文件夹进行多次更改后,它只是停止接收通知。我有一种感觉,这与两个线程正在运行有关;即notifier线程和wxpython线程。

该应用程序的目的是在检测到 IP 连接时将图片加载到屏幕上,监视文件夹中的文件“清单”,并根据该文件进行一些处理,即移动文件。

它间歇性地工作,但作为一个Python新手,我不太确定问题可能是什么,因为我基本上已经采用了线程示例并解决了它。有时,它只收到一个通知并停止接收文件更改通知。

此外,如果我重新启动 Linux 盒子并重试,它会处理大量文件更改,然后再次停止接收通知,这让我认为它可能没有正确释放手表?

任何帮助将不胜感激,并且非常欢迎优化和改进。我确信我可以从反馈中学到很多东西。代码如下

import pyinotify
import os.path
import shutil
import errno
import subprocess
import logging
import wx
import time
import signal
import sys

#update CHECKLIST name
CHECKLIST = 'Checklist' #this must exist in the update archive

#static values
DIR_UPDATE = 'd'
FILE_UPDATE = 'f'
PING_IP = ' localhost' # change in production

#paths
WATCH_PATH = '/home/test'
LOG_PATH = '/home/test/update.log'
CONNECTED_IMG = 'update.jpg'
UPDATING_IMG = 'updating.png'

#msgs
UPDATEFOUND_MSG = ' Update Found '
UPDATEUNZIPPEDSTART_MSG = ' Update unzipping '
UPDATEUNZIPPED_MSG = ' Update unzipped '
UPDATEFILE_MSG = ' Update file '
UPDATEFILEMSG_CONT = ' moved into path '
REMOVEFILEMSG_CONT = ' removed from update folder '
UPDATECOMPLETE_CONT = ' Update complete'
ROADANGELRESTART_MSG = ' Update restarting app '
DIRCREATED_MSG = ' Directory created at path '

#errors
UPDATEFAILED_MSG = ' Update process failed on '
BADLYFORMED_MSG = ' Badly formed src/dest combination '
UPDATESRCUNAVAILABLE = ' Invalid update file specified '
UPDATEDESTUNAVAILABLE = ' Invalid update destination specified '
INVALIDUPDATEFORMAT = ' Invalid format string '

#on startup create the watchfolder if it doesnt exist

WM = pyinotify.WatchManager() # Watch Manager
WM_MASK = pyinotify.IN_CLOSE_WRITE # watched events

#setup logger
LOGGER = logging.getLogger('Updater')
LOG_HANDLE = logging.FileHandler(LOG_PATH)
FORMATTER = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
LOG_HANDLE.setFormatter(FORMATTER)
LOGGER.addHandler(LOG_HANDLE) 
LOGGER.setLevel(logging.INFO)


#Global values used primarily in the main function loop
HANDLER = None
NOTIFIER = None
WDD = None
UPDATE_UI = None
WINDOW = None
CURR_IMG = None
LAST_CURRIMG = None

class EventHandler(pyinotify.ProcessEvent):
    VERBOSE = False
    """ Main class to monitor file events and process accordingly"""

    def process_IN_CLOSE_WRITE(self, event):
        """ Only executes when a Checklist has finished being written to"""
        path = event.pathname
        print 'evt'
        #look for the update_ready file before processing
        if (os.path.basename(path) == 'Checklist'):
            EventHandler.parse_updates(WATCH_PATH)

            global CURR_IMG
            CURR_IMG = os.path.join(WATCH_PATH, UPDATING_IMG)
            show_window()
            print 'update completed'
            time.sleep(1000)

    @classmethod
    def parse_updates(cls, path):
        """ parses update files """
        #handle errors for opening the file
        file_path = os.path.join(path, CHECKLIST)
        print file_path

        files = open(file_path)
        #handle errors for malformed tuples-done
        #handle errors for unavailable files-done
        #handle errors for unavailable dests-done #created automatically
        #handle permission errors

        for line in files:
            #remove linebreaks etc and ensure its not empty
            if line.strip():
                array = line.split('=')
                length = len(array)
                if length == 3:
                    EventHandler.process_line(path, array)
                else:
                    if length > 0:
                        EventHandler.print_bad_msg(array[0])
                    else:
                        EventHandler.print_bad_msg()
        print 'removing ', file_path
        os.remove(file_path) #remove the checklist file

    @classmethod
    def mkdir(cls, path):
        """ makes a directory from a path"""
        try:
            os.mkdir(path)
            print DIRCREATED_MSG, path
        except OSError, err:
            print err
            if err.errno != errno.EEXIST: #thrown when the dir already exists
                return False

    @classmethod
    def move_file(cls, src, dest):
        """ moves a file from src to dest and remove after
            expects that the dest already exists at this point
            otherwise ignores the move"""
        #print 'moving from', src, 'to ', dest
        if os.path.isfile(dest):
            shutil.copy2(src, dest)
        else:
            print UPDATEDESTUNAVAILABLE
        #remove the src file when done
        os.remove(src)

    @classmethod
    def process_line(cls, path, array):
        """ process a line from the checklist"""
        #remove newlines etc
        update_file = array[0].strip()
        update_src = os.path.join(path, update_file)
        update_dest = array[1].strip()
        update_type = array[2].strip()

        #ensure we have valid values in all three fields
        if update_file and update_dest and update_type:
            #ensure the src file exists
            if os.path.isfile(update_src):
                #check if destination is directory and
                #copy the file into the directory

                if update_type == DIR_UPDATE:
                    EventHandler.mkdir(update_dest)
                    dest = os.path.join(update_dest, update_file)
                    EventHandler.move_file(update_src, dest)
                else:
                    EventHandler.move_file(update_src, update_dest)
            else:
                print UPDATESRCUNAVAILABLE

        else:
            print INVALIDUPDATEFORMAT


    @classmethod
    def print_bad_msg(cls, msg = ''):
        """ print a badly formed message with optional value"""
        if msg:
            print BADLYFORMED_MSG, msg
        else:
            print BADLYFORMED_MSG

class UpdateFrame(wx.Frame):
    """ Displays update images to screen"""
    def __init__(self, path):
        wx.Frame.__init__(self, None, wx.ID_ANY)

        image_file = path
        image = wx.Bitmap(image_file)
        image_size = image.GetSize()
        # set the frame size to fit the screen size
        self.SetClientSize(wx.DisplaySize())

        # bitmap's upper left corner is in frame position (x, y)
        # by default pos=(0, 0)
        wx.StaticBitmap(self, wx.ID_ANY, image, size = image_size)

        # the parent is the frame
        self.SetTitle('Update Mode')

def ping_ip():
    """ ping once to establish connection """
    ret = subprocess.call("ping -c 1 %s" % PING_IP,
            shell=True,
            stdout=open('/dev/null', 'w'),
            stderr=subprocess.STDOUT)
    if ret == 0:
        return True
    else:
        return False

def show_window():
    """ update screen window when currimage changes is set """ 
    global UPDATE_UI
    global WINDOW
    global CURR_IMG
    global LAST_CURRIMG

    if LAST_CURRIMG != CURR_IMG:
        if not UPDATE_UI:
            UPDATE_UI = wx.App()

        if not WINDOW:
            WINDOW = UpdateFrame(CURR_IMG)

        UPDATE_UI.ExitMainLoop()

        while(UPDATE_UI.IsMainLoopRunning()):
            pass

        WINDOW.Destroy()

        WINDOW = UpdateFrame(CURR_IMG)
        WINDOW.Show(True)

        UPDATE_UI.MainLoop()
        LAST_CURRIMG = CURR_IMG
        print 'changed'

def in_updatemode():
    return ping_ip()

while True:
    try:
        if not in_updatemode():
            print 'waiting for connect'
            time.sleep(3)

            if  NOTIFIER:
                NOTIFIER.stop()

        else:
            if not HANDLER:
                HANDLER = EventHandler()

            if not NOTIFIER:
                NOTIFIER = pyinotify.ThreadedNotifier(WM, HANDLER)
                NOTIFIER.start()

            if not WDD:
                WDD = WM.add_watch(WATCH_PATH, WM_MASK, rec=True,quiet=False)

            # ip is active so show the image and start the notifier
            # state = ip active 
            CURR_IMG = os.path.join(WATCH_PATH, CONNECTED_IMG)
            show_window()
            print 'here'
    except KeyboardInterrupt:
        print 'out'
        NOTIFIER.stop()
        break

I have been working with pyinotify and I am having issues with it where after multiple changes to a folder it simply stops receiving notifications. I have a feeling it is something to do with the the fact that two threads are running; namely the notifier thread and the wxpython thread.

The purpose of the app is to essentially load a picture to the screen when it detects an ip connection, monitor a folder for the file 'Checklist' and based on that file do some processing i.e move files around.

It works intermittently but being a python newbie Im not exactly sure what the issue might be as I have basically taken the threaded example and worked around it. Sometimes, it only gets one notification and stops receiving file change notifications.

Additionally, if I restart the linux box and try again, it works for a good number of file changes and then stops receiving notifications again which makes me think that perhaps its not releasing the watches properly?

Any help would be greatly appreciated as well as optimizations and improvements are very welcome. I'm sure I could learn a lot from the feedback. The code is below

import pyinotify
import os.path
import shutil
import errno
import subprocess
import logging
import wx
import time
import signal
import sys

#update CHECKLIST name
CHECKLIST = 'Checklist' #this must exist in the update archive

#static values
DIR_UPDATE = 'd'
FILE_UPDATE = 'f'
PING_IP = ' localhost' # change in production

#paths
WATCH_PATH = '/home/test'
LOG_PATH = '/home/test/update.log'
CONNECTED_IMG = 'update.jpg'
UPDATING_IMG = 'updating.png'

#msgs
UPDATEFOUND_MSG = ' Update Found '
UPDATEUNZIPPEDSTART_MSG = ' Update unzipping '
UPDATEUNZIPPED_MSG = ' Update unzipped '
UPDATEFILE_MSG = ' Update file '
UPDATEFILEMSG_CONT = ' moved into path '
REMOVEFILEMSG_CONT = ' removed from update folder '
UPDATECOMPLETE_CONT = ' Update complete'
ROADANGELRESTART_MSG = ' Update restarting app '
DIRCREATED_MSG = ' Directory created at path '

#errors
UPDATEFAILED_MSG = ' Update process failed on '
BADLYFORMED_MSG = ' Badly formed src/dest combination '
UPDATESRCUNAVAILABLE = ' Invalid update file specified '
UPDATEDESTUNAVAILABLE = ' Invalid update destination specified '
INVALIDUPDATEFORMAT = ' Invalid format string '

#on startup create the watchfolder if it doesnt exist

WM = pyinotify.WatchManager() # Watch Manager
WM_MASK = pyinotify.IN_CLOSE_WRITE # watched events

#setup logger
LOGGER = logging.getLogger('Updater')
LOG_HANDLE = logging.FileHandler(LOG_PATH)
FORMATTER = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
LOG_HANDLE.setFormatter(FORMATTER)
LOGGER.addHandler(LOG_HANDLE) 
LOGGER.setLevel(logging.INFO)


#Global values used primarily in the main function loop
HANDLER = None
NOTIFIER = None
WDD = None
UPDATE_UI = None
WINDOW = None
CURR_IMG = None
LAST_CURRIMG = None

class EventHandler(pyinotify.ProcessEvent):
    VERBOSE = False
    """ Main class to monitor file events and process accordingly"""

    def process_IN_CLOSE_WRITE(self, event):
        """ Only executes when a Checklist has finished being written to"""
        path = event.pathname
        print 'evt'
        #look for the update_ready file before processing
        if (os.path.basename(path) == 'Checklist'):
            EventHandler.parse_updates(WATCH_PATH)

            global CURR_IMG
            CURR_IMG = os.path.join(WATCH_PATH, UPDATING_IMG)
            show_window()
            print 'update completed'
            time.sleep(1000)

    @classmethod
    def parse_updates(cls, path):
        """ parses update files """
        #handle errors for opening the file
        file_path = os.path.join(path, CHECKLIST)
        print file_path

        files = open(file_path)
        #handle errors for malformed tuples-done
        #handle errors for unavailable files-done
        #handle errors for unavailable dests-done #created automatically
        #handle permission errors

        for line in files:
            #remove linebreaks etc and ensure its not empty
            if line.strip():
                array = line.split('=')
                length = len(array)
                if length == 3:
                    EventHandler.process_line(path, array)
                else:
                    if length > 0:
                        EventHandler.print_bad_msg(array[0])
                    else:
                        EventHandler.print_bad_msg()
        print 'removing ', file_path
        os.remove(file_path) #remove the checklist file

    @classmethod
    def mkdir(cls, path):
        """ makes a directory from a path"""
        try:
            os.mkdir(path)
            print DIRCREATED_MSG, path
        except OSError, err:
            print err
            if err.errno != errno.EEXIST: #thrown when the dir already exists
                return False

    @classmethod
    def move_file(cls, src, dest):
        """ moves a file from src to dest and remove after
            expects that the dest already exists at this point
            otherwise ignores the move"""
        #print 'moving from', src, 'to ', dest
        if os.path.isfile(dest):
            shutil.copy2(src, dest)
        else:
            print UPDATEDESTUNAVAILABLE
        #remove the src file when done
        os.remove(src)

    @classmethod
    def process_line(cls, path, array):
        """ process a line from the checklist"""
        #remove newlines etc
        update_file = array[0].strip()
        update_src = os.path.join(path, update_file)
        update_dest = array[1].strip()
        update_type = array[2].strip()

        #ensure we have valid values in all three fields
        if update_file and update_dest and update_type:
            #ensure the src file exists
            if os.path.isfile(update_src):
                #check if destination is directory and
                #copy the file into the directory

                if update_type == DIR_UPDATE:
                    EventHandler.mkdir(update_dest)
                    dest = os.path.join(update_dest, update_file)
                    EventHandler.move_file(update_src, dest)
                else:
                    EventHandler.move_file(update_src, update_dest)
            else:
                print UPDATESRCUNAVAILABLE

        else:
            print INVALIDUPDATEFORMAT


    @classmethod
    def print_bad_msg(cls, msg = ''):
        """ print a badly formed message with optional value"""
        if msg:
            print BADLYFORMED_MSG, msg
        else:
            print BADLYFORMED_MSG

class UpdateFrame(wx.Frame):
    """ Displays update images to screen"""
    def __init__(self, path):
        wx.Frame.__init__(self, None, wx.ID_ANY)

        image_file = path
        image = wx.Bitmap(image_file)
        image_size = image.GetSize()
        # set the frame size to fit the screen size
        self.SetClientSize(wx.DisplaySize())

        # bitmap's upper left corner is in frame position (x, y)
        # by default pos=(0, 0)
        wx.StaticBitmap(self, wx.ID_ANY, image, size = image_size)

        # the parent is the frame
        self.SetTitle('Update Mode')

def ping_ip():
    """ ping once to establish connection """
    ret = subprocess.call("ping -c 1 %s" % PING_IP,
            shell=True,
            stdout=open('/dev/null', 'w'),
            stderr=subprocess.STDOUT)
    if ret == 0:
        return True
    else:
        return False

def show_window():
    """ update screen window when currimage changes is set """ 
    global UPDATE_UI
    global WINDOW
    global CURR_IMG
    global LAST_CURRIMG

    if LAST_CURRIMG != CURR_IMG:
        if not UPDATE_UI:
            UPDATE_UI = wx.App()

        if not WINDOW:
            WINDOW = UpdateFrame(CURR_IMG)

        UPDATE_UI.ExitMainLoop()

        while(UPDATE_UI.IsMainLoopRunning()):
            pass

        WINDOW.Destroy()

        WINDOW = UpdateFrame(CURR_IMG)
        WINDOW.Show(True)

        UPDATE_UI.MainLoop()
        LAST_CURRIMG = CURR_IMG
        print 'changed'

def in_updatemode():
    return ping_ip()

while True:
    try:
        if not in_updatemode():
            print 'waiting for connect'
            time.sleep(3)

            if  NOTIFIER:
                NOTIFIER.stop()

        else:
            if not HANDLER:
                HANDLER = EventHandler()

            if not NOTIFIER:
                NOTIFIER = pyinotify.ThreadedNotifier(WM, HANDLER)
                NOTIFIER.start()

            if not WDD:
                WDD = WM.add_watch(WATCH_PATH, WM_MASK, rec=True,quiet=False)

            # ip is active so show the image and start the notifier
            # state = ip active 
            CURR_IMG = os.path.join(WATCH_PATH, CONNECTED_IMG)
            show_window()
            print 'here'
    except KeyboardInterrupt:
        print 'out'
        NOTIFIER.stop()
        break

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

楠木可依 2024-12-11 06:31:12

我基本上发现问题确实是 pyinotify 的线程通知程序和 wxPython 的主循环不能很好地协同工作。

解决方案是创建一个自定义主循环(我一开始不知道您可以这样做)并将 pyinotify 的线程通知程序放置在该循环中。这样它就作为 wxPython 主循环的一部分运行。

我的想法来自
http://www.java2s.com/Open-Source/Python/GUI/wxPython/wxPython-src-2.8.11.0/wxPython/samples/mainloop/mainloop.py.htm

代码下面解释了这个概念

class CustomApp(wx.App):
    def MainLoop(self):
        global HANDLER
        global WM
        global NOTIFIER
        global WDD
        global UPDATE_UI
        global PING_TIMER

        # Create an event loop and make it active.  If you are
        # only going to temporarily have a nested event loop then
        # you should get a reference to the old one and set it as
        # the active event loop when you are done with this one...
        evtloop = wx.EventLoop()
        old = wx.EventLoop.GetActive()
        wx.EventLoop.SetActive(evtloop)

        # This outer loop determines when to exit the application,
        # for this example we let the main frame reset this flag
        # when it closes.
        while self.keepGoing:
            # At this point in the outer loop you could do
            # whatever you implemented your own MainLoop for.  It
            # should be quick and non-blocking, otherwise your GUI
            # will freeze.  

            # call_your_code_here()
            if not HANDLER:
                HANDLER = EventHandler()

            if not WM:
                WM = pyinotify.WatchManager() # Watch Manager

            if not NOTIFIER:
                NOTIFIER = pyinotify.ThreadedNotifier(WM, HANDLER)
                NOTIFIER.start()
                print 'notifier started'

            if not WDD:
                WDD = WM.add_watch(WATCH_PATH, WM_MASK, rec=True,quiet=False)

            # This inner loop will process any GUI events
            # until there are no more waiting.
            while evtloop.Pending():
                evtloop.Dispatch()

            # Send idle events to idle handlers.  You may want to
            # throttle this back a bit somehow so there is not too
            # much CPU time spent in the idle handlers.  For this
            # example, I'll just snooze a little...
            time.sleep(0.10)
            self.ProcessIdle()


        wx.EventLoop.SetActive(old)



    def OnInit(self):
        global UPDATE_UI
        if not UPDATE_UI:
            UPDATE_UI = Updater()
            UPDATE_UI.Show()
            self.SetTopWindow(UPDATE_UI)

        self.keepGoing = True
        return True

"--------------------------------------------------------------------------------------"
Watcher()
app = CustomApp(False)

此外,我想在程序中捕获 SIGINT,我使用此 url 中的配方解决了它
http://code.activestate.com/ Recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/

希望这可以帮助另一个Python新手或老手:)

I basically found out that that the issue was indeed the fact that pyinotify's threaded notifier and wxPython's main loop dont play nice together.

The solution was to create a custom main loop (something I didn't know you could do in the first place) and place the pyinotify's threaded notifier within that loop. That way it ran as part of the wxPython main loop.

I got the idea from
http://www.java2s.com/Open-Source/Python/GUI/wxPython/wxPython-src-2.8.11.0/wxPython/samples/mainloop/mainloop.py.htm

Code below explains the concept

class CustomApp(wx.App):
    def MainLoop(self):
        global HANDLER
        global WM
        global NOTIFIER
        global WDD
        global UPDATE_UI
        global PING_TIMER

        # Create an event loop and make it active.  If you are
        # only going to temporarily have a nested event loop then
        # you should get a reference to the old one and set it as
        # the active event loop when you are done with this one...
        evtloop = wx.EventLoop()
        old = wx.EventLoop.GetActive()
        wx.EventLoop.SetActive(evtloop)

        # This outer loop determines when to exit the application,
        # for this example we let the main frame reset this flag
        # when it closes.
        while self.keepGoing:
            # At this point in the outer loop you could do
            # whatever you implemented your own MainLoop for.  It
            # should be quick and non-blocking, otherwise your GUI
            # will freeze.  

            # call_your_code_here()
            if not HANDLER:
                HANDLER = EventHandler()

            if not WM:
                WM = pyinotify.WatchManager() # Watch Manager

            if not NOTIFIER:
                NOTIFIER = pyinotify.ThreadedNotifier(WM, HANDLER)
                NOTIFIER.start()
                print 'notifier started'

            if not WDD:
                WDD = WM.add_watch(WATCH_PATH, WM_MASK, rec=True,quiet=False)

            # This inner loop will process any GUI events
            # until there are no more waiting.
            while evtloop.Pending():
                evtloop.Dispatch()

            # Send idle events to idle handlers.  You may want to
            # throttle this back a bit somehow so there is not too
            # much CPU time spent in the idle handlers.  For this
            # example, I'll just snooze a little...
            time.sleep(0.10)
            self.ProcessIdle()


        wx.EventLoop.SetActive(old)



    def OnInit(self):
        global UPDATE_UI
        if not UPDATE_UI:
            UPDATE_UI = Updater()
            UPDATE_UI.Show()
            self.SetTopWindow(UPDATE_UI)

        self.keepGoing = True
        return True

"--------------------------------------------------------------------------------------"
Watcher()
app = CustomApp(False)

Additionally, I wanted to catch SIGINT in the program and I solved it using the recipe from this url
http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/

Hope this helps another python newbie or oldie :)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文