pymongo 需要超过 24 小时才能循环 200K 条记录

发布于 2025-01-04 10:11:05 字数 4194 浏览 2 评论 0原文

我正在尝试清理数据库 pagepagearchive 中的两个集合。我注意到在 pagearchive 中创建了新文档,而不是按预期向嵌入文档添加值。因此,本质上,该脚本正在执行的操作是遍历 page 中的每个文档,然后在 pagearchive 中查找该文档的所有副本,并将我想要的数据移动到单个文档中并删除额外的。

问题是 pagearchive 中只有 200K 个文档,并且根据我在底部打印的计数变量,迭代 1000 条记录需要 30 分钟到 60 分钟以上的时间。这非常慢。我见过的重复文档最多为 88 个。但大多数情况下,当我在 uu 上的 pageArchive 中查询时,我会看到 1-2 个重复文档。

mongodb 位于具有 16GB RAM 的单实例 64 位机器上。 在 pageArchive 集合上迭代的 uu 键是一个字符串。我确保该字段上有一个索引 db.pagearchive.ensureIndex({uu:1}) 我还做了一个 mongod --repair 为了更好的措施。

我的猜测是问题出在我草率的 python 代码(不太擅长)或者可能是我缺少 mongodb 所必需的东西。为什么速度这么慢或者我可以做些什么来显着加快速度?

我想也许因为 uu 字段是一个字符串,它导致了瓶颈,但这是文档中的唯一属性(或者一旦我清理这个集合)。最重要的是,当我停止该进程并重新启动它时,它会加速到每秒约 1000 条记录。直到它再次开始在集合中查找重复项,然后它再次变慢(每 10-20 分钟删除大约 100 条记录)

from pymongo import Connection
import datetime


def match_dates(old, new):
    if old['coll_at'].month == new['coll_at'].month and old['coll_at'].day == new['coll_at'].day and old['coll_at'].year == new['coll_at'].year:
        return False

    return new

connection = Connection('dashboard.dev')


db = connection['mydb']

pageArchive = db['pagearchive']
pages = db['page']

count = 0
for page in pages.find(timeout=False):

    archive_keep = None
    ids_to_delete = []
    for archive in pageArchive.find({"uu" : page['uu']}):

        if archive_keep == None:
            #this is the first record we found, so we will store data from duplicate records with this one; delete the rest
            archive_keep = archive
        else:
            for attr in archive_keep.keys():
                #make sure we are dealing with an embedded document field
                if isinstance(archive_keep[attr], basestring) or attr == 'updated_at':
                    continue
                else:
                    try:
                        if len(archive_keep[attr]) == 0:
                            continue
                    except TypeError:
                        continue
                    try:
                        #We've got our first embedded doc from a property to compare against
                        for obj in archive_keep[attr]:
                            if archive['_id'] not in ids_to_delete:
                                ids_to_delete.append(archive['_id'])
                            #loop through secondary archive doc (comparing against the archive keep)
                            for attr_old in archive.keys():
                                #make sure we are dealing with an embedded document field
                                if isinstance(archive[attr_old], basestring) or attr_old == 'updated_at':
                                    continue
                                else:
                                    try:
                                        #now we know we're dealing with a list, make sure it has data
                                        if len(archive[attr_old]) == 0:
                                            continue
                                    except TypeError:
                                        continue
                                    if attr == attr_old:
                                        #document prop. match; loop through embedded document array and make sure data wasn't collected on the same day
                                        for obj2 in archive[attr_old]:
                                            new_obj = match_dates(obj, obj2)
                                            if new_obj != False:
                                                archive_keep[attr].append(new_obj)
                    except TypeError, te:
                        'not iterable'
        pageArchive.update({
                            '_id':archive_keep['_id']}, 
                           {"$set": archive_keep}, 
                           upsert=False)
        for mongoId in ids_to_delete:
            pageArchive.remove({'_id':mongoId})
        count += 1
        if count % 100 == 0:
            print str(datetime.datetime.now()) + ' ### ' + str(count) 

I have two collections in a db page and pagearchive I am trying to clean up. I noticed that new documents were being created in the pagearchive instead of adding values to embedded documents as intended. So essentially what this script is doing is going through every document in page and then finding all copies of that document in pagearchive and moving data I want into a single document and deleted the extras.

The problem is there is only 200K documents in pagearchive and based on the count variable I am printing at the bottom, it's taking anywhere from 30min to 60+ min to iterate through 1000 records. This is extremely slow. The largest count in duplicate docs I have seen is 88. But for the most part when I query in pageArchive on uu, I see 1-2 duplicate documents.

mongodb is on a single instance 64 bit machine with 16GB of RAM.
The uu key that is being iterating on the pageArchive collection is a string. I made sure there was an index on that field db.pagearchive.ensureIndex({uu:1}) I also did a mongod --repair for good measure.

My guess is the problem is with my sloppy python code (not very good at it) or perhaps something I am missing that is necessary for mongodb. Why is it going so slow or what can I do to speed it up dramatically?

I thought maybe because the uu field is a string it's causing a bottleneck, but that's the unique property in the document (or will be once I clean up this collection). On top of that, when I stop the process and restart it, it speeds up to about 1000 records a second. Until it starts finding duplicates again in the collection, then it goes dog slow again (deleting about 100 records every 10-20 minutes)

from pymongo import Connection
import datetime


def match_dates(old, new):
    if old['coll_at'].month == new['coll_at'].month and old['coll_at'].day == new['coll_at'].day and old['coll_at'].year == new['coll_at'].year:
        return False

    return new

connection = Connection('dashboard.dev')


db = connection['mydb']

pageArchive = db['pagearchive']
pages = db['page']

count = 0
for page in pages.find(timeout=False):

    archive_keep = None
    ids_to_delete = []
    for archive in pageArchive.find({"uu" : page['uu']}):

        if archive_keep == None:
            #this is the first record we found, so we will store data from duplicate records with this one; delete the rest
            archive_keep = archive
        else:
            for attr in archive_keep.keys():
                #make sure we are dealing with an embedded document field
                if isinstance(archive_keep[attr], basestring) or attr == 'updated_at':
                    continue
                else:
                    try:
                        if len(archive_keep[attr]) == 0:
                            continue
                    except TypeError:
                        continue
                    try:
                        #We've got our first embedded doc from a property to compare against
                        for obj in archive_keep[attr]:
                            if archive['_id'] not in ids_to_delete:
                                ids_to_delete.append(archive['_id'])
                            #loop through secondary archive doc (comparing against the archive keep)
                            for attr_old in archive.keys():
                                #make sure we are dealing with an embedded document field
                                if isinstance(archive[attr_old], basestring) or attr_old == 'updated_at':
                                    continue
                                else:
                                    try:
                                        #now we know we're dealing with a list, make sure it has data
                                        if len(archive[attr_old]) == 0:
                                            continue
                                    except TypeError:
                                        continue
                                    if attr == attr_old:
                                        #document prop. match; loop through embedded document array and make sure data wasn't collected on the same day
                                        for obj2 in archive[attr_old]:
                                            new_obj = match_dates(obj, obj2)
                                            if new_obj != False:
                                                archive_keep[attr].append(new_obj)
                    except TypeError, te:
                        'not iterable'
        pageArchive.update({
                            '_id':archive_keep['_id']}, 
                           {"$set": archive_keep}, 
                           upsert=False)
        for mongoId in ids_to_delete:
            pageArchive.remove({'_id':mongoId})
        count += 1
        if count % 100 == 0:
            print str(datetime.datetime.now()) + ' ### ' + str(count) 

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

自此以后,行同陌路 2025-01-11 10:11:05

我将对代码进行以下更改:

  • in match_dates return None 而不是 False 并执行 if new_obj is not None: 它将检查引用,而不调用对象 __ne____nonzero__

  • for page in pages.find(timeout=False): 如果仅使用 uu 键并且页面很大,fields=['uu'] find 参数应该可以加快查询速度。

  • archive_keep == Nonearchive_keep is None

  • archive_keep[attr] 被调用 4 次。保存 keep_obj = archive_keep[attr] 然后使用 keep_obj 会快一点。

  • ids_to_delete = [] 更改为 ids_to_delete = set()。然后 if archive['_id'] not in ids_to_delete: 将为 O(1)

I'd make following changes to code:

  • in match_dates return None instead False and do if new_obj is not None: it will check reference, without calling object __ne__ or __nonzero__.

  • for page in pages.find(timeout=False): If only uu key is used and pages are big, fields=['uu'] parameter to find should speedup queries.

  • archive_keep == None to archive_keep is None

  • archive_keep[attr] is called 4 times. It will be little faster to save keep_obj = archive_keep[attr] and then use keep_obj.

  • change ids_to_delete = [] to ids_to_delete = set(). Then if archive['_id'] not in ids_to_delete: will be O(1)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文