无法使用Windows上的多处理加载模型

发布于 2025-01-26 02:43:38 字数 6431 浏览 5 评论 0原文

该程序在UNIX上工作,我正在尝试将其转换为Windows。

它使用多处理,我知道这是被迫在窗户上使用与linux上的窗户上产卵的问题。

这与我在主线程上加载的型号的子过程有关。

当调用contumer()时,在init()中,它从另一个文件中调用一个函数,该函数加载了一些张力量模型。

consumer()

import os
import time
import sys
from PIL import Image
import subprocess
from multiprocessing import Process, Queue
import t2identify

class Consumer:

    def __init__(self, frameSource, layer):
        self.directory = os.environ["directory"]
        self.source = frameSource
        self.task = layer
        if layer == "screen":
            self.layer = t2identify.identifyImage("apps")
        elif layer == "detail":
        self.layer = "detail"

        self.imagesChecked = 0
        self.errorsFound = 0

        self.previousApp = "none"
        self.appDetail = {
            "mobile": t2identify.identifyImage("mobile"),
        }

    def start(self, state, queue):
        self.state = state
    
        consumer1 = Process(target=self.consumeImage, args=(queue,))
        consumer1.start()
    
        consumer2 = Process(target=self.consumeImage, args=(queue,))
        consumer2.start()

        consumer1.join()
        consumer2.join()

t2Indentify.IdentifyImage()涉及加载模型。

t2indidentify.py

import matplotlib.pyplot as plt
import numpy as np
import os
from PIL import Image

class identifyImage():

    def __init__(self, layer):
        import tensorflow as tf
        availableLayers = {
            "apps":"C:/Users/PycharmProjects/NNs/tf/appModel",
        }
        self.selectedLayer = availableLayers[layer]
        self.model = tf.saved_model.load(self.selectedLayer)
        self.label = self.loadLabels(availableLayers[layer]+"/labels.txt")
        self.img_height = 224
        self.img_width = 224
...

我相信问题是当消费者子过程开始时,这里加载的模型再次加载,为什么说他们没有发现我不确定。

main.py

import pathos
import multiprocessing
import os
import time
import shutil
from os import path
from producer import Producer
from consumer import Consumer
from controller import Controller
from tqdm import tqdm
import re
import sys
from env import environmentVariables as Environment

class Main:

def start(self, test):
    self.createDirectories()
    screenQueue = multiprocessing.Queue()
    detailQueue = multiprocessing.Queue()

    self.producer = Producer(["SavedFrames", "ScreenFrames", "DetailFrames", "VisualFrames"])

    self.producerProcess = multiprocessing.Process(target=self.producer.start,
                                            args=(self.producerEvent, self.producerFrameRate, self.state,
                                                  self.expected, self.iteration, self.testCaseNumber,
                                                  [screenQueue, detailQueue]))

    self.screenConsumer = Consumer("ScreenFrames", "screen")
    # MODELS ARE LOADED
    self.detailConsumer = Consumer("DetailFrames", "detail")

    self.screenConsumerProcess = multiprocessing.Process(target=self.screenConsumer.start,
                                                         args=(self.state, screenQueue))
    self.detailConsumerProcess = multiprocessing.Process(target=self.detailConsumer.start,
                                                         args=(self.state, detailQueue))

    try:
        # Set the new thread to run the controller which performs the test cases
        self.controllerStart = Controller(self.producerEvent, self.producerFrameRate, self.state, self.expected, self.iteration,
                                                self.testCaseNumber, self.progress)
        self.controllerProcess = multiprocessing.Process(target=self.controllerStart.start, args=(test,))

    except:
        print("ERROR")
        return False

    self.producerProcess.start()
    # FAILS on starting screenConsumerProcess (see error)
    self.screenConsumerProcess.start()
    self.detailConsumerProcess.start()
    self.controllerProcess.start()

    self.producerProcess.join()
    self.screenConsumerProcess.join()
    self.detailConsumerProcess.join()
    self.controllerProcess.join()

    self.zipFiles()
    self.sendLogs()
    return True
...
if __name__ == "__main__":
    testing = Main()
    results = testing.start()

错误:

To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
  File "\venv\lib\site-packages\keras\saving\pickle_utils.py", line 48, in deserialize_model_from_bytecode
    model = save_module.load_model(temp_dir)
  File "\venv\lib\site-packages\keras\utils\traceback_utils.py", line 67, in error_handler
Traceback (most recent call last):
  File "C:/Users/PycharmProjects/project/src/main.py", line 353, in <module>
    raise e.with_traceback(filtered_tb) from None
  File "\venv\lib\site-packages\tensorflow\python\saved_model\load.py", line 978, in load_internal
    results = testing.start(data)
  File "/PycharmProjects/project/src/main.py", line 95, in start
    self.screenConsumerProcess.start()
  File "\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 112, in start
    str(err) + "\n You may be trying to load on a different device "
FileNotFoundError: Unsuccessful TensorSliceReader constructor: Failed to find any matching files for ram://4e4e1c18-ece9-4d99-88c3-5c2ee965c92a/variables/variables
 You may be trying to load on a different device from the computational device. Consider setting the `experimental_io_device` option in `tf.saved_model.LoadOptions` to the io_device such as '/job:localhost'.

子流程无法访问的模型。 AS ram:// 4e4e1c18-ece9-4d99-88c3-5c2ee965c92a/variables/variables/variables is temp_dir )。发生错误的地方。

我用尽了RAM吗?或者,现在我在Windows上时,我需要更改一些多处理代码。

编辑:

我怀疑一旦新过程启动(产卵),最有可能与Windows重新集结所有内容有关。执行此操作时,模型将重新加载,这就是错误发生的地方。但是,我仍然不确定如何解决此问题,除了将模型加载到主要的情况下,然后将模型作为参数传递到子过程中……这似乎是一个subpar解决方案。

Edit2:

现在研究使用使用莳萝而不是泡菜的悲伤。我怀疑问题是在我启动消费者流程时,目标是一类,这是不可挑选的。

This program works on Unix and I'm trying to transition it to windows.

It uses multiprocessing and I understand it's an issue with being forced to use spawning on windows opposed to forking on linux for multiprocessing.

It's to do with the subprocess loading the models that I load on the main thread.

When Consumer() is called, during the init() it calls a function from another file that loads some tensorflow models.

Consumer()

import os
import time
import sys
from PIL import Image
import subprocess
from multiprocessing import Process, Queue
import t2identify

class Consumer:

    def __init__(self, frameSource, layer):
        self.directory = os.environ["directory"]
        self.source = frameSource
        self.task = layer
        if layer == "screen":
            self.layer = t2identify.identifyImage("apps")
        elif layer == "detail":
        self.layer = "detail"

        self.imagesChecked = 0
        self.errorsFound = 0

        self.previousApp = "none"
        self.appDetail = {
            "mobile": t2identify.identifyImage("mobile"),
        }

    def start(self, state, queue):
        self.state = state
    
        consumer1 = Process(target=self.consumeImage, args=(queue,))
        consumer1.start()
    
        consumer2 = Process(target=self.consumeImage, args=(queue,))
        consumer2.start()

        consumer1.join()
        consumer2.join()

t2identify.identifyImage() involves loading models.

t2identify.py

import matplotlib.pyplot as plt
import numpy as np
import os
from PIL import Image

class identifyImage():

    def __init__(self, layer):
        import tensorflow as tf
        availableLayers = {
            "apps":"C:/Users/PycharmProjects/NNs/tf/appModel",
        }
        self.selectedLayer = availableLayers[layer]
        self.model = tf.saved_model.load(self.selectedLayer)
        self.label = self.loadLabels(availableLayers[layer]+"/labels.txt")
        self.img_height = 224
        self.img_width = 224
...

I'm confident the issue is when the consumer subprocess starts, the models that are loaded here are loaded again, why it says they're not found I'm not sure.

main.py

import pathos
import multiprocessing
import os
import time
import shutil
from os import path
from producer import Producer
from consumer import Consumer
from controller import Controller
from tqdm import tqdm
import re
import sys
from env import environmentVariables as Environment

class Main:

def start(self, test):
    self.createDirectories()
    screenQueue = multiprocessing.Queue()
    detailQueue = multiprocessing.Queue()

    self.producer = Producer(["SavedFrames", "ScreenFrames", "DetailFrames", "VisualFrames"])

    self.producerProcess = multiprocessing.Process(target=self.producer.start,
                                            args=(self.producerEvent, self.producerFrameRate, self.state,
                                                  self.expected, self.iteration, self.testCaseNumber,
                                                  [screenQueue, detailQueue]))

    self.screenConsumer = Consumer("ScreenFrames", "screen")
    # MODELS ARE LOADED
    self.detailConsumer = Consumer("DetailFrames", "detail")

    self.screenConsumerProcess = multiprocessing.Process(target=self.screenConsumer.start,
                                                         args=(self.state, screenQueue))
    self.detailConsumerProcess = multiprocessing.Process(target=self.detailConsumer.start,
                                                         args=(self.state, detailQueue))

    try:
        # Set the new thread to run the controller which performs the test cases
        self.controllerStart = Controller(self.producerEvent, self.producerFrameRate, self.state, self.expected, self.iteration,
                                                self.testCaseNumber, self.progress)
        self.controllerProcess = multiprocessing.Process(target=self.controllerStart.start, args=(test,))

    except:
        print("ERROR")
        return False

    self.producerProcess.start()
    # FAILS on starting screenConsumerProcess (see error)
    self.screenConsumerProcess.start()
    self.detailConsumerProcess.start()
    self.controllerProcess.start()

    self.producerProcess.join()
    self.screenConsumerProcess.join()
    self.detailConsumerProcess.join()
    self.controllerProcess.join()

    self.zipFiles()
    self.sendLogs()
    return True
...
if __name__ == "__main__":
    testing = Main()
    results = testing.start()

The error:

To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
  File "\venv\lib\site-packages\keras\saving\pickle_utils.py", line 48, in deserialize_model_from_bytecode
    model = save_module.load_model(temp_dir)
  File "\venv\lib\site-packages\keras\utils\traceback_utils.py", line 67, in error_handler
Traceback (most recent call last):
  File "C:/Users/PycharmProjects/project/src/main.py", line 353, in <module>
    raise e.with_traceback(filtered_tb) from None
  File "\venv\lib\site-packages\tensorflow\python\saved_model\load.py", line 978, in load_internal
    results = testing.start(data)
  File "/PycharmProjects/project/src/main.py", line 95, in start
    self.screenConsumerProcess.start()
  File "\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 112, in start
    str(err) + "\n You may be trying to load on a different device "
FileNotFoundError: Unsuccessful TensorSliceReader constructor: Failed to find any matching files for ram://4e4e1c18-ece9-4d99-88c3-5c2ee965c92a/variables/variables
 You may be trying to load on a different device from the computational device. Consider setting the `experimental_io_device` option in `tf.saved_model.LoadOptions` to the io_device such as '/job:localhost'.

The models that are loaded aren't accessable by subprocesses. As ram://4e4e1c18-ece9-4d99-88c3-5c2ee965c92a/variables/variables is temp_dir in keras/pickle_utils.py model = save_module.load_model(temp_dir). Which is where the error occured.

Am I running out of ram? Or do I need to change some multiprocessing code now that I'm on windows.

EDIT:

I suspect it's most likely to do with windows reimporting everything once a new process starts (spawning). While doing this, the models are reloaded, and that's where the error occurs. I am still however unsure how to go about resolving this, apart from loading the models in main then passing the models into the subprocesses as parameters... which seems like a subpar solution.

EDIT2:

Now looking into using pathos which uses dill and not pickle. As I suspect the issue is with when I start the consumer process, the target is a class, which is not pickleable.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

_失温 2025-02-02 02:43:38

为了避免酸酸tensorflow 模型,请尝试将这些模型的创建移至使用它们的过程:

class Consumer:

    def __init__(self, frameSource, layer):
        self.directory = os.environ["directory"]
        self.source = frameSource
        self.task = layer
        self._layer = layer
        
        # The following code is moved to the consumeImage method:
        """
        if layer == "screen":
            self.layer = t2identify.identifyImage("apps")
        elif layer == "detail":
        self.layer = "detail"
        """

        self.imagesChecked = 0
        self.errorsFound = 0

        self.previousApp = "none"
        # The following code is moved to consumeImage:
        """
        self.appDetail = {
            "mobile": t2identify.identifyImage("mobile"),
        }
        """

    def start(self, state, queue):
        self.state = state
    
        consumer1 = Process(target=self.consumeImage, args=(queue,))
        consumer1.start()
    
        consumer2 = Process(target=self.consumeImage, args=(queue,))
        consumer2.start()

        consumer1.join()
        consumer2.join()

    def consumeImage(self, queue):
        self.appDetail = {
            "mobile": t2identify.identifyImage("mobile"),
        }
        if self._layer == "screen":
            self.layer = t2identify.identifyImage("apps")
        elif self._layer == "detail":
            self.layer = "detail"
        ...

To avoid pickling of the tensorflow models, try moving the creation of these models to the process that uses them:

class Consumer:

    def __init__(self, frameSource, layer):
        self.directory = os.environ["directory"]
        self.source = frameSource
        self.task = layer
        self._layer = layer
        
        # The following code is moved to the consumeImage method:
        """
        if layer == "screen":
            self.layer = t2identify.identifyImage("apps")
        elif layer == "detail":
        self.layer = "detail"
        """

        self.imagesChecked = 0
        self.errorsFound = 0

        self.previousApp = "none"
        # The following code is moved to consumeImage:
        """
        self.appDetail = {
            "mobile": t2identify.identifyImage("mobile"),
        }
        """

    def start(self, state, queue):
        self.state = state
    
        consumer1 = Process(target=self.consumeImage, args=(queue,))
        consumer1.start()
    
        consumer2 = Process(target=self.consumeImage, args=(queue,))
        consumer2.start()

        consumer1.join()
        consumer2.join()

    def consumeImage(self, queue):
        self.appDetail = {
            "mobile": t2identify.identifyImage("mobile"),
        }
        if self._layer == "screen":
            self.layer = t2identify.identifyImage("apps")
        elif self._layer == "detail":
            self.layer = "detail"
        ...
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文