尾部文件放入消息队列

发布于 2024-11-02 23:12:43 字数 1730 浏览 0 评论 0原文

我通过 python 的子进程(特别是在 AWS EC2 上)在 Linux 机器上启动一个进程,该进程生成许多文件。我需要“tail -f”这些文件并将每个生成的 json 化输出发送到各自的 AWS SQS 队列。我将如何完成这样的任务?

编辑

正如这个答案所建议的,asyncprocPEP3145,我可以通过以下方式做到这一点:

from asyncproc import Process
import Queue
import os
import time

# Substitute AWS SQS for Queue
sta_queue = Queue.Queue()
msg_queue = Queue.Queue()
running_procs = {'status':(Process(['/usr/bin/tail', '--retry', '-f','test.sta']),sta_queue),'message':(Process(['/usr/bin/tail', '--retry', '-f', 'test.msg' ]),msg_queue)}

def handle_proc(p,q):
    latest = p.read()
    if latest:
        # If nothing new, latest will be an empty string
        q.put(latest)
    retcode = p.wait(flags=os.WNOHANG)
    return retcode

while len(running_procs):
    proc_names = running_procs.keys()
    for proc_name in proc_names:
        proc, q = running_procs[proc_name]
        retcode = handle_proc(proc, q)
        if retcode is not None: # Process finished.
            del running_procs[proc_name]
    time.sleep(1.0)
print("Status queue")
while not sta_queue.empty():
    print(sta_queue.get())
print("Message queue")
while not msg_queue.empty():
    print(msg_queue.get())

我认为这应该足够了,除非其他人可以提供更好的答案。

更多编辑

我对这个问题想得太多了。虽然上面的方法效果很好,但我认为最简单的解决方案是: - 检查文件是否存在 - 如果文件存在,则将它们复制到 AWS S3 上的存储桶,并通过 AWS SQS 发送一条消息,表明文件已复制。每 60 秒重复一次 -消费者应用程序轮询SQS并最终收到文件已被复制的消息 -消费者应用程序从S3下载文件并用最新内容替换以前的内容。重复直到作业完成

尽管子进程中异步 IO 的整个问题仍然是一个问题。

I launch a process on a linux machine via python's subprocess (specifically on AWS EC2) which generates a number of files. I need to "tail -f" these files and send each of the resulting jsonified outputs to their respective AWS SQS queues. How would I go about such a task?

Edit

As suggested by this answer, asyncproc, and PEP3145, I can do this with the following:

from asyncproc import Process
import Queue
import os
import time

# Substitute AWS SQS for Queue
sta_queue = Queue.Queue()
msg_queue = Queue.Queue()
running_procs = {'status':(Process(['/usr/bin/tail', '--retry', '-f','test.sta']),sta_queue),'message':(Process(['/usr/bin/tail', '--retry', '-f', 'test.msg' ]),msg_queue)}

def handle_proc(p,q):
    latest = p.read()
    if latest:
        # If nothing new, latest will be an empty string
        q.put(latest)
    retcode = p.wait(flags=os.WNOHANG)
    return retcode

while len(running_procs):
    proc_names = running_procs.keys()
    for proc_name in proc_names:
        proc, q = running_procs[proc_name]
        retcode = handle_proc(proc, q)
        if retcode is not None: # Process finished.
            del running_procs[proc_name]
    time.sleep(1.0)
print("Status queue")
while not sta_queue.empty():
    print(sta_queue.get())
print("Message queue")
while not msg_queue.empty():
    print(msg_queue.get())

This should be sufficient, I think, unless others can provide a better answer.

More Edits

I'm overthinking the problem. Although the above works nicely, I think the simplest solution is:
-check for the existence of the files
-if the files exist, copy them to a bucket on AWS S3 and send a message through AWS SQS that files have been copied. Repeat every 60 seconds
-consumer app polls SQS and eventually receives message that files have been copied
-consumer app downloads files from S3 and replaces the previous contents with the latest contents. Repeat until job completes

Although the whole issue of asynchronous IO in subprocess is still an issue.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

花开浅夏 2024-11-09 23:12:43

您可以使用 subprocess.Popen 类来运行 tail 并读取其输出。

try:
    process = subprocess.Popen(['tail', '-f', filename], stdout=PIPE)
except (OSError, ValueError):
    pass    # TODO: handle errors
output = process.stdout.read()

subprocess.check_output 函数在一行中提供了此功能。它是 Python 2.7 版本中的新增内容。

try:
    output = subprocess.check_output(['tail', '-f', filename], stdout=PIPE)
except CalledProcessError:
    pass    # TODO: handle errors

对于非阻塞 I/O,请参阅此问题

You can use the subprocess.Popen class to run tail and read its output.

try:
    process = subprocess.Popen(['tail', '-f', filename], stdout=PIPE)
except (OSError, ValueError):
    pass    # TODO: handle errors
output = process.stdout.read()

The subprocess.check_output function provides this functionality in a one-liner. It is new in Python version 2.7.

try:
    output = subprocess.check_output(['tail', '-f', filename], stdout=PIPE)
except CalledProcessError:
    pass    # TODO: handle errors

For non-blocking I/O, see this question.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文