使用全局时,Python运行threadpoolexecutor crosswise

发布于 2025-02-01 19:55:10 字数 1569 浏览 2 评论 0原文

在我的Python应用程序中,我使用芹菜处理长期运行的任务(其中许多)。在我的一项任务中,我有以下代码将段上传到S3存储,请一般可以看一下,

    if not debug_level['DONT_SEND']:
        count_lock = threading.Lock()
        obj_count = 0

        def __upload(object_path_pair):
            def percentage(part, whole):
                percentage = 100 * int(part) / int(whole)
                return str(percentage) + "%"
            global obj_count
            if obj_count > 0:
                print(f'Uploading segment #{obj_count} to CDN.')
            libera_resource.upload_file(*object_path_pair)
            sleep(random.uniform(1, 5))
            with count_lock:
                if obj_count > 0:
                    print(f' Segment {obj_count} of {len(segment_upload_list)} ({percentage(obj_count, whole=len(segment_upload_list))}) uploaded successfully.')
                obj_count += 1

        def upload_segments(segment_upload_list):
            global obj_count
            obj_count = 0
            with ThreadPoolExecutor(max_workers=50) as executor:
                executor.map(__upload, segment_upload_list)
            print('\n!!! All Segments Uploaded !!!')

        upload_segments(segment_upload_list)

但是一旦任务运行两次,我会从我的打印语句中收到奇怪的消息

“ print(f'segment {obj_count} ({百分比(obj_count,thoter = len(segment_upload_list))})上传 成功。')。

成功的 我会回来(如果它运行的时间更多,则同时运行) “ 100(140%)的第233段成功上传。”

如您所见,打印说明没有多大意义,如果任务在后台运行两次,为什么要像这样打印它? 这是由于我设定的全球人数吗?如果是这样,这里可能有什么解决方法? 我目前唯一可以想象的是,任务调用1和在运行时呼叫2使用了全局计数,然后导致异常计数,这再次导致了这种丑陋的输出。

提前致谢

At my python application I use celery to process long running tasks (many of them). Within one of my tasks I have the following code to upload segments to an S3 Storage, please have a look

    if not debug_level['DONT_SEND']:
        count_lock = threading.Lock()
        obj_count = 0

        def __upload(object_path_pair):
            def percentage(part, whole):
                percentage = 100 * int(part) / int(whole)
                return str(percentage) + "%"
            global obj_count
            if obj_count > 0:
                print(f'Uploading segment #{obj_count} to CDN.')
            libera_resource.upload_file(*object_path_pair)
            sleep(random.uniform(1, 5))
            with count_lock:
                if obj_count > 0:
                    print(f' Segment {obj_count} of {len(segment_upload_list)} ({percentage(obj_count, whole=len(segment_upload_list))}) uploaded successfully.')
                obj_count += 1

        def upload_segments(segment_upload_list):
            global obj_count
            obj_count = 0
            with ThreadPoolExecutor(max_workers=50) as executor:
                executor.map(__upload, segment_upload_list)
            print('\n!!! All Segments Uploaded !!!')

        upload_segments(segment_upload_list)

In general this is working fine but as soon as the task runs twice at the same time I get strange messages from my print statement

"print(f' Segment {obj_count} of {len(segment_upload_list)}
({percentage(obj_count, whole=len(segment_upload_list))}) uploaded
successfully.')".

Instead of printing "Segment 34 of 100 (34%) uploaded successfully."
I'll get back (if it runs more then once at the same time)
"Segment 233 of 100 (140%) uploaded successfully."

As you can see the print statement does not make much sense, buy why does it gets printed like that if the task runs twice in the background?
Is this due to the global count I set? And if so, what could be a possible workaround here?
The only thing I can currently imagin is that the global count is used by task call 1 and call 2 at runtime, this then results in a abnormal count which again leads to this ugly output.

Thanks in advance

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

吹泡泡o 2025-02-08 19:55:10

如果您正在运行多个上传线程,则应该为每个线程提供一个计数器。

一种可能的解决方案是使用 thread noreferrer“> thread-local data 对于计数。

编辑:
线程本地数据的一个重要限制是,只能通过其自己的线程才能访问

因此,如果您需要来自多个线程的数据, 另一个线程可访问,则必须创建一个线程特定数据的全局字典。
有关此词典的一些观点:

  • 使用线程的esident作为字典的键。或者,只要您将其设置为描述性,name
  • 每个线程都应 更新其自己的数据!
  • 每个线程都可以读取来自其他线程的数据。

这应该主要是 阻止字典被拧紧。
但是,在更新字典时,线程会被中断有很小的机会。
那将使数据处于不一致的状态。

因此,您可能应该通过lock序列化对字典的访问。

edit2:

线程的小示例,该线程在锁定锁定时会更新全局数据结构。

def target(args):
    '''Function to run in a thread.'''
    tid = threading.get_ident()
    running = True
    while running:
        # do stuff...
        # Update the global data structure.
        data_modify_lock.acquire()
        tread_data[tid]['obj_count'] = ...
        # et cetera
        data_modify_lock.release()


data_modify_lock = threading.Lock()
thread_data = {
   2: {'obj_count': 6, 'segment_list': [...]},
   3: {'obj_count': 2, 'segment_list': [...]}
}

If you are running more than one upload thread, you should have a counter for each thread.

One possible solution is to use thread-local data for the counts.

Edit:
One important limitation of thread-local data is that it can only be accessed by its own thread.

So if you need data from more than one thread, accessible by another thread you will have to create a global dictionary of thread specific data.
Some points regarding this dictionary:

  • Use a thread's ident as the key for the dictionary. Alternatively the name, provided you have set it to something descriptive.
  • Every thread should only update it's own data!
  • Every thread may read the data from other threads.

That should mostly prevent the dictionary being screwed up.
There is however a small chance of a thread being interrupted while it is updating the dictionary.
That would leave the data in an inconsistent state.

Therefore you should probably serialize access to the dictionary through a Lock, just to be sure.

Edit2:

Small example of a thread that updates a global data structure when it holds a lock.

def target(args):
    '''Function to run in a thread.'''
    tid = threading.get_ident()
    running = True
    while running:
        # do stuff...
        # Update the global data structure.
        data_modify_lock.acquire()
        tread_data[tid]['obj_count'] = ...
        # et cetera
        data_modify_lock.release()


data_modify_lock = threading.Lock()
thread_data = {
   2: {'obj_count': 6, 'segment_list': [...]},
   3: {'obj_count': 2, 'segment_list': [...]}
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文