使用需要锁定的词典使用多处理
我正在尝试使用Python的多处理库来加快我拥有的一些代码。我有一个词典,其值需要根据循环的结果进行更新。当前的代码看起来像这样:
def get_topic_count():
topics_to_counts = {}
for news in tqdm.tqdm(RawNews.objects.all().iterator()):
for topic in Topic.objects.filter(is_active=True):
if topic.name not in topics_to_counts.keys():
topics_to_counts[topic.name] = 0
if topic.name.lower() in news.content.lower():
topics_to_counts[topic.name] += 1
for key, value in topics_to_counts.items():
print(f"{key}: {value}")
我相信工人功能应该看起来像这样:
def get_topic_count_worker(news, topics_to_counts, lock):
for topic in Topic.objects.filter(is_active=True):
if topic.name not in topics_to_counts.keys():
lock.acquire()
topics_to_counts[topic.name] = 0
lock.release()
if topic.name.lower() in news.content.lower():
lock.acquire()
topics_to_counts[topic.name] += 1
lock.release()
但是,我编写主要功能时遇到了一些麻烦。到目前为止,这是我所拥有的,但我一直在被杀死的信息我认为它使用的记忆太多。
def get_topic_count_master():
topics_to_counts = {}
raw_news = RawNews.objects.all().iterator()
lock = multiprocessing.Lock()
args = []
for news in tqdm.tqdm(raw_news):
args.append((news, topics_to_counts, lock))
with multiprocessing.Pool() as p:
p.starmap(get_topic_count_worker, args)
for key, value in topics_to_counts.items():
print(f"{key}: {value}")
这里的任何指导将不胜感激!
更新:大约有160万张记录需要通过。我该如何正确地块?
更新2:以下是一些示例数据:
topics = models.ManyToManyField('Topic', blank=True)
I am trying to use Python's multiprocessing library to speed up some code I have. I have a dictionary whose values need to be updated based on the result of a loop. The current code looks like this:
def get_topic_count():
topics_to_counts = {}
for news in tqdm.tqdm(RawNews.objects.all().iterator()):
for topic in Topic.objects.filter(is_active=True):
if topic.name not in topics_to_counts.keys():
topics_to_counts[topic.name] = 0
if topic.name.lower() in news.content.lower():
topics_to_counts[topic.name] += 1
for key, value in topics_to_counts.items():
print(f"{key}: {value}")
I believe the worker function should look like this:
def get_topic_count_worker(news, topics_to_counts, lock):
for topic in Topic.objects.filter(is_active=True):
if topic.name not in topics_to_counts.keys():
lock.acquire()
topics_to_counts[topic.name] = 0
lock.release()
if topic.name.lower() in news.content.lower():
lock.acquire()
topics_to_counts[topic.name] += 1
lock.release()
However, I'm having some trouble writing the main function. Here's what I have so far but I keep getting a process killed message I believe it's using too much memory.
def get_topic_count_master():
topics_to_counts = {}
raw_news = RawNews.objects.all().iterator()
lock = multiprocessing.Lock()
args = []
for news in tqdm.tqdm(raw_news):
args.append((news, topics_to_counts, lock))
with multiprocessing.Pool() as p:
p.starmap(get_topic_count_worker, args)
for key, value in topics_to_counts.items():
print(f"{key}: {value}")
Any guidance here would be appreciated!
Update: There are about 1.6 million records that it needs to go through. How would I chunk this properly?
Update 2: Here's some sample data:
Update 3:
Here is the relation in the RawNews table:
topics = models.ManyToManyField('Topic', blank=True)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
问题与数据库的限制有关。它加快了多线程的过程,但是数据库一次限制了100个ping。在任何给定时间增加此连接或最大数量到小于100的数字。
The problem was related to a restriction on the database. It speeds up the process to multithread, but the database has a restriction of 100 pings at a single time. Either increase this connection or max out the number of threads at any given time to a number less than 100.