使用 sqlalchemy 检查行是否存在并返回 id 很慢
所有,
我正在读取 csv 文件并使用 sqlalchemy 将数据添加到 MySQL 数据库。其中一张表是地址表,它只应该保存唯一的地址。这些地址和另一个“语句”表之间存在关系,该表具有地址 id 的外键字段。
因此,对于数据文件中的每一行,我创建一个新语句 obj,然后获取关联地址的 id。如果地址已经存在,则返回该 ID。否则,我创建一个新地址 obj 并返回该 id。这是使用下面的代码完成的,改编自 this SO问题。
def get_or_create(self, model, rec):
instance = self.session.query(model).filter_by(**dict(filter(lambda (x,y): x in model.__dict__.keys(), rec.iteritems()))).first()
if instance:
return instance
else:
instance = model(rec)
return instance
我在 id 字段中使用 GUID,它是地址表主键的一部分:
class address(Base):
__tablename__ = 'address'
id = id_column()
name = Column(String(75), primary_key=True)
Address_Line_One = Column(String(50), primary_key=True)
Address_Line_Two = Column(String(50), primary_key=True)
Address_Line_Three = Column(String(50), primary_key=True)
Address_Line_Four = Column(String(50), primary_key=True)
id_column()
来自 此处,尽管它已转换为CHAR(32)
由于其他地方的限制。最后,这里有一个片段:
currStatement = statements(rec, id=currGUID)
currStatement.address = self.get_or_create(address, rec)
这一切都很好,只是速度很慢。对于在一个事务中插入约 65,000 条语句,我发现在干净的测试数据库上插入时间为 1.5 小时。实时观察插入显示它很快达到约 10,000 行,然后插入速度开始下降。
我可以做什么来加快插入时间?
编辑:
经过进一步测试,我发现插入时间缓慢的部分原因是每个对象都是单独插入的。因此,我有大约 65,000 行,每一行都成为几个 sqlalchemy 对象,单独插入。使用 sqlalchemy 0.7,如何批量插入对象?
All,
I'm reading a csv file and adding the data to a MySQL DB with sqlalchemy. One of the tables is the address table, which is only supposed to hold unique addresses. There is a relationship between these addresses and another table of "statements" which have a foreign key field of the address id.
So, for each row in my data file, I create a new statement obj, then get the id for the associated address. If the address already exists, that id is returned. Otherwise, I create a new address obj and return that id. This is done using the code below, adapted from this SO question.
def get_or_create(self, model, rec):
instance = self.session.query(model).filter_by(**dict(filter(lambda (x,y): x in model.__dict__.keys(), rec.iteritems()))).first()
if instance:
return instance
else:
instance = model(rec)
return instance
I'm using GUID's for my id field, and it is part of the primary key for the address table:
class address(Base):
__tablename__ = 'address'
id = id_column()
name = Column(String(75), primary_key=True)
Address_Line_One = Column(String(50), primary_key=True)
Address_Line_Two = Column(String(50), primary_key=True)
Address_Line_Three = Column(String(50), primary_key=True)
Address_Line_Four = Column(String(50), primary_key=True)
The id_column()
comes from here, though it has been converted to CHAR(32)
due to limitations elsewhere. Finally, there is the snippet here:
currStatement = statements(rec, id=currGUID)
currStatement.address = self.get_or_create(address, rec)
This all works fine except it is very slow. For ~65,000 statements inserted in one transaction, I see 1.5 hr insert time on a clean test DB. Watching the insert in realtime shows it quickly get to ~10,000 rows, then the insert speed starts falling off.
What can I do to speed up this insert time?
Edit:
After further testing, I've found that the slow insert time is partially because each object is inserted individually. So, I have ~65,000 rows, each of which becomes several sqlalchemy objects, inserted individually. With sqlalchemy 0.7, how can I bulk insert my objects?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
好吧!
所以答案是我单独插入每一行,并为每个地址检查往返数据库。地址检查是最糟糕的部分,因为它的速度呈指数级下降。我计算出插入原始数据(1.5 小时),然后再次插入相同的数据,大约需要 9 小时!
因此,这个答案将回顾我为转换为批量插入语句所做的事情,以及一些需要注意的事情。
ORM 很棒,但意识到它与批量插入并不能很好地配合。批量插入需要在会话上使用较低级别的
execute
语句。它们不将 ORM 对象作为输入,而是将字典列表和insert
对象作为输入。因此,如果您将充满行的 csv 文件转换为 ORM 对象,则需要不将它们添加到当前会话,而是将它们转换为字典以供以后使用。asdict 方法来自此处。这将为您提供所创建的 ORM 对象中的列的字典。它们永远不会被添加到会话中,并且很快就会从内存中消失。
如果你已经设置了 ORM 关系:
确保 backref 中的级联为空!否则,将关系中的对象插入会话将 级联通过其余对象。当您稍后尝试批量插入值时,它们将被视为重复项而被拒绝......如果您幸运的话。
这很重要,因为部分要求是获取有效地址的 address_id(如果存在),如果不存在则添加该地址。由于查询往返速度非常慢,我将
get_or_create
更改为:使用
get
导致 sqlalchemy 首先检查会话,从而防止跨网络传输。但是,只有在会话中添加新地址时它才有效!还记得关系吗?这被级联到声明的插入中。另外,如果您没有flush()
或有autoflush=True
,则get
无法看到新添加的对象。创建会话时,保留您的对象!
self.session = sessionmaker(autoflush=False, expire_on_commit=False)
如果您不包含
expire_on_commit=False
那么您将丢失地址,并再次开始往返。现在我们已经获得了要插入的 ORM 对象的字典列表。但我们还需要一个插入对象。
埋藏在文档中,它似乎可以使用
classname.__table__
作为 插入。因此,在会话中,使用 ORM 类获取表来获取插入对象,并使用字典列表运行执行。事后别忘了承诺!这将允许您成功地将批量插入和 ORM 与关系以及查询 sqlalchemy 中的唯一条目混合在一起。只要注意内存不足即可。我必须一次批量插入
~30,000
记录,否则py2.7(32bit)
在使用大约2G
时会崩溃。Alright!
So the answer is that I was individually inserting each row, and round tripping to the DB for each address check. The address check was the worst part, since it got exponentially slower. I calculated that inserting the original data (1.5 hrs), and then inserting the same data again, would take ~9 hrs!
So this answer will go over what I did to convert to bulk insert statements, as well as some things to watch out for.
ORM is great, but realize it doesn't exactly mesh well with bulk inserts. Bulk inserts require using the lower level
execute
statements on the session. These don't take ORM objects as inputs, but a list of dictionaries and aninsert
object. So if your converting a csv file full of rows into ORM objects, you need to NOT add them to the current session, but instead convert them to dictionaries for later.The asdict method comes from here. That gets you dictionaries of the columns in the ORM objects created. They never get added to the session, and drop out of memory shortly thereafter.
If you have set up an ORM relationship:
Make sure cascade is blank in the backref! Otherwise, inserting an object in the relationship into the session will cascade through the rest of the objects. When you try to bulk insert your values later, they will be rejected as duplicates...if you're lucky.
This is important because part of the requirements was getting the address_id for a valid address if it existed, and adding the address if it did not. Since the query round tripping was so slow, I changed
get_or_create
to:Using
get
causes sqlalchemy to check the session first, preventing trips across the network. But, it only works if new addresses are added to the session! Remember the relationship? This was cascading into inserts of the statements. Also, if you don'tflush()
or haveautoflush=True
thenget
cannot see the newly added objects.When you create the session, persist your objects!
self.session = sessionmaker(autoflush=False, expire_on_commit=False)
If you don't include
expire_on_commit=False
then you lose your addresses, and start round-tripping again.Now we've got a list of dictionaries for the ORM objects to insert. But we also need an insert object.
Buried in the docs, it seems that one can use
classname.__table__
for the necessary table object, required by insert. So on the session, using the ORM class to get the table to get the insert object, run an execute with the list of dictionaries. Don't forget to commit afterwards!This will allow you to successfully mix bulk inserting and ORM with relationships and querying for unique entries in sqlalchemy. Just watch out for running out of memory. I had to bulk insert
~30,000
records at a time, otherwisepy2.7(32bit)
would crash at around2G
used.