使用 Python 监控文件系统事件,无需其他低级库

发布于 2024-12-11 05:19:13 字数 141 浏览 0 评论 0原文

例如,我需要捕获 Linux 操作系统上某个目录上的删除和添加文件事件。我找到了像 inotify 和 python 包装器这样的库,但是如果我想使用清晰的 python 代码,我应该每秒观察 os.listdir(path) 输出,还是有一些方法可以完成这样的任务?

Ex, I need to catch remove and add files events on some directory on linux os. I found libs like inotify and python wrappers for them, but if I want to use clear python code should I watch for os.listdir(path) output every sec or are there some ways to accomplish such task?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

星光不落少年眉 2024-12-18 05:19:13

来源:http://code.activestate.com/ Recipes/215418-watching-a-directory-tree-on-unix/

watch_directories() 函数采用路径列表和可调用对象,然后重复遍历目录树以这些路径为根,监视被删除或修改时间更改的文件。然后向可调用对象传递两个列表,其中包含已更改的文件和已删除的文件。

from __future__ import nested_scopes

import os, time

def watch_directories (paths, func, delay=1.0):
    """(paths:[str], func:callable, delay:float)
    Continuously monitors the paths and their subdirectories
    for changes.  If any files or directories are modified,
    the callable 'func' is called with a list of the modified paths of both
    files and directories.  'func' can return a Boolean value
    for rescanning; if it returns True, the directory tree will be
    rescanned without calling func() for any found changes.
    (This is so func() can write changes into the tree and prevent itself
    from being immediately called again.)
    """

    # Basic principle: all_files is a dictionary mapping paths to
    # modification times.  We repeatedly crawl through the directory
    # tree rooted at 'path', doing a stat() on each file and comparing
    # the modification time.  

    all_files = {}
    def f (unused, dirname, files):
        # Traversal function for directories
        for filename in files:
            path = os.path.join(dirname, filename)

            try:
                t = os.stat(path)
            except os.error:
                # If a file has been deleted between os.path.walk()
                # scanning the directory and now, we'll get an
                # os.error here.  Just ignore it -- we'll report
                # the deletion on the next pass through the main loop.
                continue

            mtime = remaining_files.get(path)
            if mtime is not None:
                # Record this file as having been seen
                del remaining_files[path]
                # File's mtime has been changed since we last looked at it.
                if t.st_mtime > mtime:
                    changed_list.append(path)
            else:
                # No recorded modification time, so it must be
                # a brand new file.
                changed_list.append(path)

            # Record current mtime of file.
            all_files[path] = t.st_mtime

    # Main loop
    rescan = False
    while True:
        changed_list = []
        remaining_files = all_files.copy()
        all_files = {}
        for path in paths:
            os.path.walk(path, f, None)
        removed_list = remaining_files.keys()
        if rescan:
            rescan = False
        elif changed_list or removed_list:
            rescan = func(changed_list, removed_list)

        time.sleep(delay)

if __name__ == '__main__':
    def f (changed_files, removed_files):
        print changed_files
        print 'Removed', removed_files

    watch_directories(['.'], f, 1)

当您想要某种方式将作业发送到守护程序,但又不想使用某些 IPC 机制(例如套接字或管道)时,此配方很有用。相反,守护进程可以坐下来监视提交目录,并且可以通过将文件或目录放入提交目录来提交作业。

不考虑锁定。 watch_directories() 函数本身并不需要进行锁定;如果它在一次传递中错过了修改,它会在下一次传递时注意到它。但是,如果作业直接写入监视目录,则可调用对象可能会在作业文件仅写入一半时开始运行。为了解决这个问题,你可以使用锁文件;可调用对象在运行时必须获取锁,提交者在希望添加新作业时也必须获取锁。一种更简单的方法是依赖 rename() 系统调用的原子性:将作业写入未监视的临时目录,文件完成后使用 os.rename() 将其移动到提交目录中。

Source: http://code.activestate.com/recipes/215418-watching-a-directory-tree-on-unix/

The watch_directories() function takes a list of paths and a callable object, and then repeatedly traverses the directory trees rooted at those paths, watching for files that get deleted or have their modification time changed. The callable object is then passed two lists containing the files that have changed and the files that have been removed.

from __future__ import nested_scopes

import os, time

def watch_directories (paths, func, delay=1.0):
    """(paths:[str], func:callable, delay:float)
    Continuously monitors the paths and their subdirectories
    for changes.  If any files or directories are modified,
    the callable 'func' is called with a list of the modified paths of both
    files and directories.  'func' can return a Boolean value
    for rescanning; if it returns True, the directory tree will be
    rescanned without calling func() for any found changes.
    (This is so func() can write changes into the tree and prevent itself
    from being immediately called again.)
    """

    # Basic principle: all_files is a dictionary mapping paths to
    # modification times.  We repeatedly crawl through the directory
    # tree rooted at 'path', doing a stat() on each file and comparing
    # the modification time.  

    all_files = {}
    def f (unused, dirname, files):
        # Traversal function for directories
        for filename in files:
            path = os.path.join(dirname, filename)

            try:
                t = os.stat(path)
            except os.error:
                # If a file has been deleted between os.path.walk()
                # scanning the directory and now, we'll get an
                # os.error here.  Just ignore it -- we'll report
                # the deletion on the next pass through the main loop.
                continue

            mtime = remaining_files.get(path)
            if mtime is not None:
                # Record this file as having been seen
                del remaining_files[path]
                # File's mtime has been changed since we last looked at it.
                if t.st_mtime > mtime:
                    changed_list.append(path)
            else:
                # No recorded modification time, so it must be
                # a brand new file.
                changed_list.append(path)

            # Record current mtime of file.
            all_files[path] = t.st_mtime

    # Main loop
    rescan = False
    while True:
        changed_list = []
        remaining_files = all_files.copy()
        all_files = {}
        for path in paths:
            os.path.walk(path, f, None)
        removed_list = remaining_files.keys()
        if rescan:
            rescan = False
        elif changed_list or removed_list:
            rescan = func(changed_list, removed_list)

        time.sleep(delay)

if __name__ == '__main__':
    def f (changed_files, removed_files):
        print changed_files
        print 'Removed', removed_files

    watch_directories(['.'], f, 1)

This recipe is useful where you'd like some way to send jobs to a daemon, but don't want to use some IPC mechanism such as sockets or pipes. Instead, the daemon can sit and watch a submission directory, and jobs can be submitted by dropping a file or directory into the submission directory.

Locking is not taken into account. The watch_directories() function itself doesn't really need to do locking; if it misses a modification on one pass, it'll notice it on the next pass. However, if jobs are written directly into a watched directory, the callable object might start running while a job file is only half-written. To solve this, you can use a lockfile; the callable must acquire the lock when it runs, and submitters must acquire the lock when they wish to add a new job. A simpler approach is to rely on the rename() system call being atomic: write the job into a temporary directory that isn't being watched, and once the file is complete use os.rename() to move it into the submission directory.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文