为什么我的使用 Queue 、 threading.Thread 和 subprocess 的多线程 python 脚本如此不稳定

发布于 2024-10-09 23:25:33 字数 2307 浏览 0 评论 0原文

我有三个 shell 脚本 P1 、 P2 和 P3 ,我正在尝试链接它们。这三个 shell 脚本需要串联运行,但在任何给定时间都可以运行多个 P1、P2 和 P3。

我需要在数十个文件上快速运行这些,因此希望使用线程并并行执行操作。

我正在使用 python Thread 、 Queue 和 subprocess 模块来实现这一点。

我的问题是,当我的线程数大于 1 时,程序行为不稳定,并且线程不会以可重现的方式相互切换。有时,所有五个线程都能完美工作并完成工作。

这是我第一次尝试使用线程做某事,我确信这是因为涉及竞争条件的线程的常见问题。但我想知道如何清理我的代码。

实际代码位于(https://github.com/harijay/xtaltools/blob/master/process_multi.py)。下面给出伪代码。抱歉,如果代码混乱。

我的问题是为什么使用这种设计会出现不稳定的行为。线程在任何给定时间都访问不同的文件。 subprocess.call 仅在 shell 脚本完成且其生成的文件写入磁盘时返回。

我可以做些什么不同的事情? 我试图在这里尽可能简洁地解释我的设计。

我的基本设计:

P1_Queue = Queue()
P2_Queue = Queue()
P3_Queue = Queue()

class P1_Thread(Thread):
    def __init__(self,P1_Queue,P2_Queue):
        Thread.__init__(self)
        self.in_queue = P1_Queue
        self.out_queue = P2_Queue

    def run(self):
        while True:
            my_file_to_process = self.in_queue.get()
            if my_file_to_process = None:
                break
            P1_runner = P1_Runner(my_file_to_process)
            P1_runner.run_p1_using_subprocess()
            self.out_queue.put(my_file_to_process)

类 p1 Runner 获取输入文件句柄,然后调用 subprocess.call() 来运行使用文件输入的 shell 脚本,并使用 run_p1_using_subprocess 方法生成新的输出文件。

class P1_runner(object):

    def __init__(self,inputfile):
        self.my_shell_script = """#!/usr/bin/sh
prog_name <<eof
input 1
...
eof"""
       self.my_shell_script_file = open("some_unique_p1_file_name.sh")
       os.chmod("some_unique_file_name.sh",0755)

    def run_p1_using_subprocess(self):
        subprocess.call([self.my_shell_script_file])

I have essentially similar classes for P2 and P3 . All of which call a shell script that is custom generated

The chaining is achieved using a series of Thread Pools.
p1_worker_list = []
p2_worker_list = []
p3_worker_list = []

for i in range(THREAD_COUNT):
    p1_worker = P1_Thread(P1_Queue,P2_Queue)
    p1_worker.start()
    p1_worker_list.append(p1_worker)

for worker in p1_worker_list:
    worker.join()

And then again the same code block for p2 and p3

for i in range(THREAD_COUNT):
    p2_worker = P2_Thread(P2_Queue,P3_Queue)
    p2_worker.start()
    p2_worker_list.append(p1_worker)

for worker in p2_worker_list:
    worker.join()

非常感谢您的帮助/建议

I have three shell scripts P1 , P2 and P3 which I am trying to chain. These three shell scripts need to be run in series , but at any given time multiple P1s and P2s and P3s can be running.

I need to run these on tens of files and quickly and hence the desire to use Threads and do things in parallel.

I am using the python Thread , Queue and subprocess module to achieve this.

My problem is when I have a thread count of greater than one , the program behaves erratically and the threads dont hand off to each other in a reproduceable manner. SOmetimes all five threads work perfectly and work to completion.

This is my first attempt at doing something using threads and I am certain this is because of the usual issues with Threads involving race conditions. But I want to know how I can go about cleaning up my code.

The actual code is at (https://github.com/harijay/xtaltools/blob/master/process_multi.py). Pseudocode is given below. Sorry if the code is messy.

My question is Why do I have erratic behavior using this design. The Threads are all accessing different files at any given time. Also subprocess.call returns only when the shell script is finished and the file it produces is written to disk.

What can I do differently?
I have tried to explain my design here as succinctly as possible.

My Basic design:

P1_Queue = Queue()
P2_Queue = Queue()
P3_Queue = Queue()

class P1_Thread(Thread):
    def __init__(self,P1_Queue,P2_Queue):
        Thread.__init__(self)
        self.in_queue = P1_Queue
        self.out_queue = P2_Queue

    def run(self):
        while True:
            my_file_to_process = self.in_queue.get()
            if my_file_to_process = None:
                break
            P1_runner = P1_Runner(my_file_to_process)
            P1_runner.run_p1_using_subprocess()
            self.out_queue.put(my_file_to_process)

The class p1 Runner takes the input file handle and then calls the subprocess.call() to run a shell script that uses the file input and produces a new output file using a run_p1_using_subprocess method.

class P1_runner(object):

    def __init__(self,inputfile):
        self.my_shell_script = """#!/usr/bin/sh
prog_name <<eof
input 1
...
eof"""
       self.my_shell_script_file = open("some_unique_p1_file_name.sh")
       os.chmod("some_unique_file_name.sh",0755)

    def run_p1_using_subprocess(self):
        subprocess.call([self.my_shell_script_file])

I have essentially similar classes for P2 and P3 . All of which call a shell script that is custom generated

The chaining is achieved using a series of Thread Pools.
p1_worker_list = []
p2_worker_list = []
p3_worker_list = []

for i in range(THREAD_COUNT):
    p1_worker = P1_Thread(P1_Queue,P2_Queue)
    p1_worker.start()
    p1_worker_list.append(p1_worker)

for worker in p1_worker_list:
    worker.join()

And then again the same code block for p2 and p3

for i in range(THREAD_COUNT):
    p2_worker = P2_Thread(P2_Queue,P3_Queue)
    p2_worker.start()
    p2_worker_list.append(p1_worker)

for worker in p2_worker_list:
    worker.join()

Thanks a tonne for your help/advice

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

时常饿 2024-10-16 23:25:33

嗯,这真的很糟糕:

runner.run()

您不应该手动调用线程的 run 方法。您可以使用 .start() 启动一个线程。你的代码是一团糟,这里没有人会费力地通过它来找到你的错误。

Well this is really bad:

runner.run()

You shouldn't ever call a thread's run method manually. You start a thread with .start(). Your code is a HUGE mess and no one here is going to wade through it to find your error.

淑女气质 2024-10-16 23:25:33

当另一个线程清空其输入队列时,线程的退出条件使它们自杀:

    my_file_to_process = self.in_queue.get()
    if my_file_to_process = None:  # my sister ate faster than I did, so...
        break # ... I kill myself!

线程死亡只是因为它们在准备好做更多事情时找不到工作要做。

相反,您应该让线程进入睡眠状态(等待),直到其输入队列上的事件发出信号为止,只有当编排器(主程序)发出处理已完成的信号时才会死亡(设置自杀标志,并向所有队列发出信号)。

(我看到你已经更改了代码)。

@Falmarri 在其他地方的注释中可能意味着你的问题不是关于特定问题(其他人可以回答的问题),因为 threading 库是错误的,并且您对编程语言的使用通常很尴尬。例如:

  • worker.join() 的调用使主程序在启动 P2 线程之前按顺序等待所有 P1 线程的终止,从而挫败任何并发尝试。
  • 您应该重写 Thread.run() 或为构造函数提供可调用对象。不需要 Pn_runner 类。
  • 所有线程类都执行相同的操作。每个流程阶段不需要不同的类。
  • 如果您已经在使用 Python,那么调用外部程序(更不用说 shell 脚本)是没有意义的,除非您绝对无法在纯 Python 中轻松完成这项工作。
  • 由于上述原因,让程序将 shell 脚本写入文件系统是非常奇怪的,而且几乎肯定是不必要的。

我建议解决你的这个特殊问题是:

  1. 尝试坚持 100% Python。如果您不能,或者看起来太困难,您至少会找到必须从外部访问的特定功能。
  2. 构建一个不使用并发的解决方案。
  3. 测量程序的性能并尝试在算法上改进它。
  4. 如果可以的话,避免使用线程。受 CPU 限制的程序将占用所有可用周期而不使用线程。过于依赖磁盘(或绑定任何外部/远程资源)的程序如果没有其他事情可做,最终将等待磁盘。为了从线程中受益,程序必须在计算和外部资源使用之间取得适当的平衡(或者必须能够在请求到达时为其提供服务,即使在其他情况下很忙)。
  5. Pythonic方式进行:从简单开始,逐渐增加功能和复杂性,同时始终避免任何看似复杂的事情。

如果您的目的是自学 Python 中的线程,那么请务必寻找一个简单的问题来进行实验。如果您只想并行运行多个 shell 脚本,那么 bash 和其他 shell 已经为此做好了准备,您无需使用 Python。

The thread's exit condition makes them commit suicide when another thread empties their input queue:

    my_file_to_process = self.in_queue.get()
    if my_file_to_process = None:  # my sister ate faster than I did, so...
        break # ... I kill myself!

Threads are dieing just because they didn't find work to do when they were ready for more.

You should instead make the threads go to sleep (wait) until an event on their input queue is signaled, an die only when the orchestrator (main program) signals that processing is done (set the suicide flag, and signal all queues).

(I see you already changed the code).

What @Falmarri probably means in his note elsewhere is that your question is not about a specific problem (something others can answer) because the overall use of the threading library in your code is wrong, and your use of the programming language in general is awkward. For example:

  • The call to worker.join() makes the main program wait for the termination of all P1 threads, in order, before launching the P2 threads, thus defeating any attempt at concurrency.
  • You should either override Thread.run() or provide a callable to the constructor. There's no need for the Pn_runner classes.
  • All the thread classes do the same. Yo don't need a different class per process stage.
  • If you are already using Python, then it makes no sense to call an external program (much less a shell script) unless you absolutely cannot do the work easily in pure Python.
  • Because of the above, having your program write shell scripts to the file system is very odd, and almost certainly unnecessary.

What I suggest to do to solve this particular problem of yours is:

  1. Try to stick to 100% Python. If you can't, or it seems too difficult, you'll at least have found the specific functionality that has to be accessed externally.
  2. Build a solution that doesn't use concurrency.
  3. Measure the program's performance and try to improve it algorithmically.
  4. Avoid threading if you can. A program that is CPU-bound will eat all the available cycles without threading. A program that is too disk-bound (or bound any external/remote resource) will end up waiting for the disk if it hasn't anything else to do. To benefit from threading a program must have the right balance between calculation and external resource use (or must be able to service requests as they arrive even when otherwise busy).
  5. Do it the pythonic way: start simple, and gradually increase functionality and complexity while, at all times, avoiding anything that seems complicated.

If your intention is to teach yourself about threading in Python, then by all means seek a simple problem to experiment with. And if all you wanted was to run several shell scripts in parallel, then bash and the other shells already have provisions for that, and you don't need to use Python.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文