如何在使用线程/多处理时使用简单的 sqlalchemy 调用
问题
我正在编写一个程序,从语料库中读取一组文档(每一行都是一个文档)。每个文档都使用函数 processdocument
进行处理,分配一个唯一的 ID,然后写入数据库。理想情况下,我们希望使用多个流程来完成此操作。逻辑如下:
- 主例程创建一个新数据库并设置一些表。
- 主例程设置一组将运行辅助函数的进程/线程。
- 主例程启动所有进程。
- 主例程读取语料库,将文档添加到队列中。
- 每个进程的工作函数都会循环,从队列中读取文档,使用 processdocument 从中提取信息,并将信息写入数据库表中的新条目。
- 一旦队列为空并且主例程设置了适当的标志(一旦没有更多文档可以添加到队列中),工作循环就会中断。
问题
我对 sqlalchemy(以及一般数据库)比较陌生。据我所知,我认为在主例程中用于设置数据库的代码工作得很好。我陷入困境的是,我不确定在每个进程的工作函数中到底要放入什么内容才能写入数据库,而不会与其他进程发生冲突。
没有什么特别复杂的事情发生:每个进程都会从 multiprocessing.Value 对象中获取一个唯一的值来分配给条目,并受锁保护。我只是不确定是否应该将什么传递给工作函数(除了队列之外)(如果有的话)。我是否传递在主例程中创建的 sqlalchemy.Engine 实例?元数据实例?我是否为每个流程创建一个新引擎?还有其他规范的方法可以做到这一点吗?有什么特别的事情我需要记住吗?
其他评论
我很清楚我可以不打扰多处理,而是在单个进程中执行此操作,但稍后我将不得不编写具有多个进程读取数据库的代码,所以我现在不妨弄清楚如何做到这一点。
预先感谢您的帮助!
Problem
I am writing a program that reads a set of documents from a corpus (each line is a document). Each document is processed using a function processdocument
, assigned a unique ID, and then written to a database. Ideally, we want to do this using several processes. The logic is as follows:
- The main routine creates a new database and sets up some tables.
- The main routine sets up a group of processes/threads that will run a worker function.
- The main routine starts all the processes.
- The main routine reads the corpus, adding documents to a queue.
- Each process's worker function loops, reading a document from a queue, extracting the information from it using
processdocument
, and writes the information to a new entry in a table in the database. - The worker loops breaks once the queue is empty and an appropriate flag has been set by the main routine (once there are no more documents to add to the queue).
Question
I'm relatively new to sqlalchemy (and databases in general). I think the code used for setting up the database in the main routine works fine, from what I can tell. Where I'm stuck is I'm not sure exactly what to put into the worker functions for each process to write to the database without clashing with the others.
There's nothing particularly complicated going on: each process gets a unique value to assign to an entry from a multiprocessing.Value object, protected by a Lock. I'm just not sure whether what I should be passing to the worker function (aside from the queue), if anything. Do I pass the sqlalchemy.Engine instance I created in the main routine? The Metadata instance? Do I create a new engine for each process? Is there some other canonical way of doing this? Is there something special I need to keep in mind?
Additional Comments
I'm well aware I could just not bother with the multiprocessing but and do this in a single process, but I will have to write code that has several processes reading for the database later on, so I might as well figure out how to do this now.
Thanks in advance for your help!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
元数据及其表对象的集合应被视为应用程序的固定、不可变的结构,与函数和类定义不同。正如您所知,通过分叉子进程,应用程序的所有模块级结构仍然跨进程边界存在,并且表定义通常属于此类。
然而,引擎指的是 DBAPI 连接池,这些连接通常是 TCP/IP 连接,有时是文件句柄。 DBAPI 连接本身通常不可跨子进程边界移植,因此您可能需要为每个子进程创建一个新引擎,或者使用非池化引擎,这意味着您正在使用 NullPool。
您也不应该对元数据与引擎进行任何类型的关联,即“绑定”元数据。这种做法虽然在各种过时的教程和博客文章中很突出,但实际上并不是通用目的,我尝试尽可能地不强调这种工作方式。
如果您使用 ORM,则存在类似的“程序结构/主动工作”二分法,其中您的映射类当然在所有子流程之间共享,但您肯定希望 Session 对象位于特定子流程的本地 - 这些对应于实际的 DBAPI 连接以及大量其他可变状态(最好保留在操作本地)。
The MetaData and its collection of Table objects should be considered a fixed, immutable structure of your application, not unlike your function and class definitions. As you know with forking a child process, all of the module-level structures of your application remain present across process boundaries, and table defs are usually in this category.
The Engine however refers to a pool of DBAPI connections which are usually TCP/IP connections and sometimes filehandles. The DBAPI connections themselves are generally not portable over a subprocess boundary, so you would want to either create a new Engine for each subprocess, or use a non-pooled Engine, which means you're using NullPool.
You also should not be doing any kind of association of MetaData with Engine, that is "bound" metadata. This practice, while prominent on various outdated tutorials and blog posts, is really not a general purpose thing and I try to de-emphasize this way of working as much as possible.
If you're using the ORM, a similar dichotomy of "program structures/active work" exists, where your mapped classes of course are shared between all subprocesses, but you definitely want Session objects to be local to a particular subprocess - these correspond to an actual DBAPI connection as well as plenty of other mutable state which is best kept local to an operation.