使用全局时,Python运行threadpoolexecutor crosswise
在我的Python应用程序中,我使用芹菜处理长期运行的任务(其中许多)。在我的一项任务中,我有以下代码将段上传到S3存储,请一般可以看一下,
if not debug_level['DONT_SEND']:
count_lock = threading.Lock()
obj_count = 0
def __upload(object_path_pair):
def percentage(part, whole):
percentage = 100 * int(part) / int(whole)
return str(percentage) + "%"
global obj_count
if obj_count > 0:
print(f'Uploading segment #{obj_count} to CDN.')
libera_resource.upload_file(*object_path_pair)
sleep(random.uniform(1, 5))
with count_lock:
if obj_count > 0:
print(f' Segment {obj_count} of {len(segment_upload_list)} ({percentage(obj_count, whole=len(segment_upload_list))}) uploaded successfully.')
obj_count += 1
def upload_segments(segment_upload_list):
global obj_count
obj_count = 0
with ThreadPoolExecutor(max_workers=50) as executor:
executor.map(__upload, segment_upload_list)
print('\n!!! All Segments Uploaded !!!')
upload_segments(segment_upload_list)
但是一旦任务运行两次,我会从我的打印语句中收到奇怪的消息
“ print(f'segment {obj_count} ({百分比(obj_count,thoter = len(segment_upload_list))})上传 成功。')。
成功的 我会回来(如果它运行的时间更多,则同时运行) “ 100(140%)的第233段成功上传。”
如您所见,打印说明没有多大意义,如果任务在后台运行两次,为什么要像这样打印它? 这是由于我设定的全球人数吗?如果是这样,这里可能有什么解决方法? 我目前唯一可以想象的是,任务调用1和在运行时呼叫2使用了全局计数,然后导致异常计数,这再次导致了这种丑陋的输出。
提前致谢
At my python application I use celery to process long running tasks (many of them). Within one of my tasks I have the following code to upload segments to an S3 Storage, please have a look
if not debug_level['DONT_SEND']:
count_lock = threading.Lock()
obj_count = 0
def __upload(object_path_pair):
def percentage(part, whole):
percentage = 100 * int(part) / int(whole)
return str(percentage) + "%"
global obj_count
if obj_count > 0:
print(f'Uploading segment #{obj_count} to CDN.')
libera_resource.upload_file(*object_path_pair)
sleep(random.uniform(1, 5))
with count_lock:
if obj_count > 0:
print(f' Segment {obj_count} of {len(segment_upload_list)} ({percentage(obj_count, whole=len(segment_upload_list))}) uploaded successfully.')
obj_count += 1
def upload_segments(segment_upload_list):
global obj_count
obj_count = 0
with ThreadPoolExecutor(max_workers=50) as executor:
executor.map(__upload, segment_upload_list)
print('\n!!! All Segments Uploaded !!!')
upload_segments(segment_upload_list)
In general this is working fine but as soon as the task runs twice at the same time I get strange messages from my print statement
"print(f' Segment {obj_count} of {len(segment_upload_list)}
({percentage(obj_count, whole=len(segment_upload_list))}) uploaded
successfully.')".
Instead of printing "Segment 34 of 100 (34%) uploaded successfully."
I'll get back (if it runs more then once at the same time)
"Segment 233 of 100 (140%) uploaded successfully."
As you can see the print statement does not make much sense, buy why does it gets printed like that if the task runs twice in the background?
Is this due to the global count I set? And if so, what could be a possible workaround here?
The only thing I can currently imagin is that the global count is used by task call 1 and call 2 at runtime, this then results in a abnormal count which again leads to this ugly output.
Thanks in advance
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
如果您正在运行多个上传线程,则应该为每个线程提供一个计数器。
一种可能的解决方案是使用 thread noreferrer“> thread-local data 对于计数。
编辑:
线程本地数据的一个重要限制是,只能通过其自己的线程才能访问。
因此,如果您需要来自多个线程的数据, 另一个线程可访问,则必须创建一个线程特定数据的全局字典。
有关此词典的一些观点:
esident
作为字典的键。或者,只要您将其设置为描述性,name
。这应该主要是 阻止字典被拧紧。
但是,在更新字典时,线程会被中断有很小的机会。
那将使数据处于不一致的状态。
因此,您可能应该通过
lock
序列化对字典的访问。edit2:
线程的小示例,该线程在锁定锁定时会更新全局数据结构。
If you are running more than one upload thread, you should have a counter for each thread.
One possible solution is to use thread-local data for the counts.
Edit:
One important limitation of thread-local data is that it can only be accessed by its own thread.
So if you need data from more than one thread, accessible by another thread you will have to create a global dictionary of thread specific data.
Some points regarding this dictionary:
ident
as the key for the dictionary. Alternatively thename
, provided you have set it to something descriptive.That should mostly prevent the dictionary being screwed up.
There is however a small chance of a thread being interrupted while it is updating the dictionary.
That would leave the data in an inconsistent state.
Therefore you should probably serialize access to the dictionary through a
Lock
, just to be sure.Edit2:
Small example of a thread that updates a global data structure when it holds a lock.