有效地将成千上万的记录推向了MongoDB
我们有一个数据库,其中包含约2000万个记录,其中包含索引字段 order_id 。
每天,每小时之后,我们都会收到一个增量更新,从〜2k开始,增长到〜50k记录,有些可能是新的,而另一些则是对先前记录的更新。
为了处理这些数据,我使用气流创建了管道。在将数据推向MongoDB之前,当我使用它来处理和清洁数据时,数据可以作为PANDAS数据帧可用。
目前,我将此代码用于 upSert 数据。但是我不确定这是正确或有效的解决方案。
from pymongo import ReplaceOne
col = MongoDB.get_collection("collection", mongo_db="database")
# My processed data
df
df.reset_index()
bulk_data = [
ReplaceOne({"order_id": row["order_id"]}, dict(row), upsert=True)
for index, row in df.iterrows()
]
col.bulk_write(bulk_data)
那么其他选择是什么?是这样做任务逻辑的方式还是我做错了?
We have a database containing around ~20 Million records with an indexed field order_id.
Every day, after each hour, we receive an incremental update, starting from ~2K and growing to ~50K records which some might be new, and others are updates to previous records.
To process these data, I have created a pipeline using Airflow. And before pushing the data to MongoDB, The data is available as a Pandas dataframe as I'm using it to process and clean the data.
For now, I'm using this code to upsert the data. But I'm not sure if it's the correct or efficient solution or not.
from pymongo import ReplaceOne
col = MongoDB.get_collection("collection", mongo_db="database")
# My processed data
df
df.reset_index()
bulk_data = [
ReplaceOne({"order_id": row["order_id"]}, dict(row), upsert=True)
for index, row in df.iterrows()
]
col.bulk_write(bulk_data)
So what might be the other options? Is this way of doing the task logical or I'm doing it wrong?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您拥有一种大多有效的技术;您的查询字段已索引,您正在使用批量操作。即使在50k记录上,我也会感到惊讶。
如果您想挤压最后的性能,则此方法可能会更快。它删除了所有传入的记录并重新插入它们;通常,这比使用
replaceOne()
用upsert = true
更具性能。同样,与使用iterrows()
相比,pandas中的to_dict()
方法删除了步骤。最后,您可以将批量订购的选项转换为false,这再次是一种完整的增益,因为您大概不在乎插入的顺序。You have a mostly efficient technique; your query field is indexed and you are using bulk operations. I would be surprised if this is running slowly, even on 50k records.
If you want to squeeze the last drop of performance out, this approach may be quicker. It deletes all the incoming records and re-inserts them; generally this is more performant than using
ReplaceOne()
withupsert=True
. Also theto_dict()
method in pandas removes a step compared to usingiterrows()
. Finally you can turn the bulk ordered option to False which again is a perfomance gain, as you presumably don't care abiut the order of the inserts.