如何用多个图像文件进行压力测试烧瓶应用程序

发布于 2025-02-07 07:20:28 字数 981 浏览 1 评论 0原文

我有一个执行对象检测的烧瓶应用程序。我可以通过使用此方法通过卷曲来获得所需的输出:

curl -X POST -F images=@images/dog.jpeg http://localhost:8000/detections

我希望使用蝗虫对其进行压力测试。我当前的方法是这样的:

class UserBehavior(TaskSet):
    def _get_image_part(self, file_path, file_content_type='image/jpeg'):
        file_name = os.path.basename(file_path)
        file_content = open(file_path, 'rb')
        return file_name, file_content, file_content_type

    @task
    def detections(self):
        url = '/detections'
        
        files = {
            "images": self._get_image_part("images/dog.jpeg"),
        }
        response = self.client.post(url, files=files, verify=False)
        print(response.text)

class MyLocust(HttpUser):
    tasks = [UserBehavior]
    wait_time = between(0.1, 0.2)

如何为相同的测试图像加载列表(数组),以便在提出的每一个请求中,从目录中上传了一个随机选择的图像?

此外,当触发小负载(2-3个用< 5rps的用户)时,上述片段正常工作,但是在更高的请求或并发用户时,应用程序会引发错误(最常见的是FilenotFoundError)。是合理的吗?

提前致谢

I have a flask app which performs object detection. I'm able to get the desired output via curl by using this:

curl -X POST -F images=@images/dog.jpeg http://localhost:8000/detections

I wish to stress test it using locust. My current approach is something like this:

class UserBehavior(TaskSet):
    def _get_image_part(self, file_path, file_content_type='image/jpeg'):
        file_name = os.path.basename(file_path)
        file_content = open(file_path, 'rb')
        return file_name, file_content, file_content_type

    @task
    def detections(self):
        url = '/detections'
        
        files = {
            "images": self._get_image_part("images/dog.jpeg"),
        }
        response = self.client.post(url, files=files, verify=False)
        print(response.text)

class MyLocust(HttpUser):
    tasks = [UserBehavior]
    wait_time = between(0.1, 0.2)

How do I load a list (array) of test images for the same, so that with every request made, a randomly selected image from a directory is uploaded?

Additionally, the above snippet works fine when a small load is triggered (2-3 users with <5rps) but on higher requests or concurrent users, the application throws error (the most common one being FileNotFoundError). Is it justified?

Thanks in advance

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

寄居者 2025-02-14 07:20:28

将图像加载到列表中(作为全局或类级变量),然后在任务中随机选择一个。

您要避免的是每次都从磁盘中重新加载图像,因为确实没有必要。

Load the images into a list (as a global or class level variable), then just pick one randomly in the task.

What you want to avoid is reloading the image from disk every time, because there is really no need.

佞臣 2025-02-14 07:20:28

基于 cyberwiz的答案,我通过在目录中创建图像列表,然后从此列表中随机选择图像来解决它每个请求。

这是我的实施:

def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.image_list = os.listdir('images')

@task
def detections(self):
    url = '/detections'
    image_name = random.choice(self.image_list)
    
    files = {
        "images": self._get_image_part(os.path.join(os.getcwd(), 'images', image_name))
    }
    response = self.client.post(url, files=files, verify=False)
    print(response.text)

Based on Cyberwiz's answer, I solved it by creating a list of images in a directory and then randomly selecting an image from this list for every request.

Here's my implementation:

def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.image_list = os.listdir('images')

@task
def detections(self):
    url = '/detections'
    image_name = random.choice(self.image_list)
    
    files = {
        "images": self._get_image_part(os.path.join(os.getcwd(), 'images', image_name))
    }
    response = self.client.post(url, files=files, verify=False)
    print(response.text)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文