使用数据库表作为作业队列(又名批处理队列或消息队列)的最佳方法

发布于 2024-07-09 05:03:00 字数 731 浏览 9 评论 0原文

我有一个数据库表,其中约有 50K 行,每行代表一个需要完成的工作。 我有一个程序,可以从数据库中提取作业,完成该作业并将结果放回数据库中。 (这个系统现在正在运行)

现在我想允许多个处理任务来完成工作,但要确保没有任务被完成两次(作为一种性能问题,而不是这会导致其他问题)。 因为访问是通过存储过程进行的,所以我当前的想法是将所述存储过程替换为看起来像这样的东西顺便说一句

update tbl 
set owner = connection_id() 
where available and owner is null limit 1;

select stuff 
from tbl 
where owner = connection_id();

; 工人的任务可能会断开获得工作和提交结果之间的联系。 另外,我不认为数据库会成为瓶颈,除非我把那部分弄乱了(每分钟约 5 个作业)

这有什么问题吗? 有一个更好的方法吗?

注意:“数据库作为 IPC 反模式” 在这里只是稍微合适一点,因为

  1. 我没有进行 IPC(没有进程生成行,它们现在都已经存在),并且
  2. 针对该反模式描述的主要抱怨是,当进程等待消息时,它会导致数据库上不必要的负载(在我的情况下,如果没有消息,一切都可以关闭,因为一切都完成了)

I have a databases table with ~50K rows in it, each row represents a job that need to be done. I have a program that extracts a job from the DB, does the job and puts the result back in the db. (this system is running right now)

Now I want to allow more than one processing task to do jobs but be sure that no task is done twice (as a performance concern not that this will cause other problems). Because the access is by way of a stored procedure, my current though is to replace said stored procedure with something that looks something like this

update tbl 
set owner = connection_id() 
where available and owner is null limit 1;

select stuff 
from tbl 
where owner = connection_id();

BTW; worker's tasks might drop there connection between getting a job and submitting the results. Also, I don't expect the DB to even come close to being the bottle neck unless I mess that part up (~5 jobs per minute)

Are there any issues with this? Is there a better way to do this?

Note: the "Database as an IPC anti-pattern" is only slightly apropos here because

  1. I'm not doing IPC (there is no process generating the rows, they all already exist right now) and
  2. the primary gripe described for that anti-pattern is that it results in unneeded load on the DB as processes wait for messages (in my case, if there are no messages, everything can shutdown as everything is done)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

眼泪淡了忧伤 2024-07-16 05:03:00

在关系数据库系统中实现作业队列的最佳方法是使用 SKIP已锁定

SKIP LOCKED 是一个锁获取选项,适用于读/共享 (FOR SHARE) 或写/独占 (FOR UPDATE) 锁,并且现在得到广泛支持:

  • Oracle 10g 及更高版本
  • PostgreSQL 9.5 及更高版本
  • SQL Server 2005 及更高版本
  • MySQL 8.0 及更高版本

现在,假设我们有以下 post 表:

post table

< code>status 列用作 Enum,其值为:

  • PENDING (0)、
  • APPROVED (1)、
  • 垃圾邮件 (2)。

如果我们有多个并发用户尝试审核 post 记录,我们需要一种方法来协调他们的工作,以避免两个审核者审核同一 post 行。

所以,SKIP LOCKED正是我们所需要的。 如果两个并发用户 Alice 和 Bob 执行以下 SELECT 查询,这些查询独占锁定帖子记录,同时还添加了 SKIP LOCKED 选项:

[Alice]:
SELECT
    p.id AS id1_0_,1
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED
 
[Bob]:                                                                                                                                                                                                              
SELECT
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

我们可以看到 Alice 可以选择前两个条目,而 Bob 选择接下来 2 条记录。 如果没有SKIP LOCKED,Bob 锁获取请求将被阻塞,直到 Alice 释放前 2 条记录上的锁。

The best way to implement a job queue in a relational database system is to use SKIP LOCKED.

SKIP LOCKED is a lock acquisition option that applies to both read/share (FOR SHARE) or write/exclusive (FOR UPDATE) locks and is widely supported nowadays:

  • Oracle 10g and later
  • PostgreSQL 9.5 and later
  • SQL Server 2005 and later
  • MySQL 8.0 and later

Now, consider we have the following post table:

post table

The status column is used as an Enum, having the values of:

  • PENDING (0),
  • APPROVED (1),
  • SPAM (2).

If we have multiple concurrent users trying to moderate the post records, we need a way to coordinate their efforts to avoid having two moderators review the same post row.

So, SKIP LOCKED is exactly what we need. If two concurrent users, Alice and Bob, execute the following SELECT queries which lock the post records exclusively while also adding the SKIP LOCKED option:

[Alice]:
SELECT
    p.id AS id1_0_,1
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED
 
[Bob]:                                                                                                                                                                                                              
SELECT
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

We can see that Alice can select the first two entries while Bob selects the next 2 records. Without SKIP LOCKED, Bob lock acquisition request would block until Alice releases the lock on the first 2 records.

酷遇一生 2024-07-16 05:03:00

以下是我过去成功使用过的内容:

MsgQueue 表架构

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL  
SourceCode varchar(20)  -- process inserting the message -- NULLable  
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL 
CreateTime datetime -- default GETDATE() -- NOT NULL  
Msg varchar(255) -- NULLable  

您的消息类型就是您所期望的 - 符合插入进程和读取进程之间契约的消息,使用 XML 或您的结构构建其他表示形式的选择(例如,JSON 在某些情况下会很方便)。

然后0到n个进程可以插入,并且0到n个进程可以读取和处理消息。每个读取进程通常处理单个消息类型。 可以运行某个进程类型的多个实例以实现负载平衡。

读取器拉取一条消息,并在处理该消息时将状态更改为“A”active。 完成后,它将状态更改为“C”完成。 它可以删除该消息或不删除该消息,具体取决于您是否要保留审核跟踪。 State = 'N' 的消息按照 MsgType/Timestamp 顺序拉取,因此在 MsgType + State + CreateTime 上有一个索引。

变体:
“E”错误状态。
Reader 进程代码列。
状态转换的时间戳。

这提供了一个很好的、可扩展的、可见的、简单的机制来完成您所描述的许多事情。 如果您对数据库有基本的了解,那么它是非常简单且可扩展的。


评论中的代码:

CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) 
AS 
DECLARE @MsgId INT 

BEGIN TRAN 

SELECT TOP 1 @MsgId = MsgId 
FROM MsgQueue 
WHERE MessageType = @pMessageType AND State = 'N' 
ORDER BY CreateTime


IF @MsgId IS NOT NULL 
BEGIN 

UPDATE MsgQueue 
SET State = 'A' 
WHERE MsgId = @MsgId 

SELECT MsgId, Msg 
FROM MsgQueue 
WHERE MsgId = @MsgId  
END 
ELSE 
BEGIN 
SELECT MsgId = NULL, Msg = NULL 
END 

COMMIT TRAN

Here's what I've used successfully in the past:

MsgQueue table schema

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL  
SourceCode varchar(20)  -- process inserting the message -- NULLable  
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL 
CreateTime datetime -- default GETDATE() -- NOT NULL  
Msg varchar(255) -- NULLable  

Your message types are what you'd expect - messages that conform to a contract between the process(es) inserting and the process(es) reading, structured with XML or your other choice of representation (JSON would be handy in some cases, for instance).

Then 0-to-n processes can be inserting, and 0-to-n processes can be reading and processing the messages, Each reading process typically handles a single message type. Multiple instances of a process type can be running for load-balancing.

The reader pulls one message and changes the state to "A"ctive while it works on it. When it's done it changes the state to "C"omplete. It can delete the message or not depending on whether you want to keep the audit trail. Messages of State = 'N' are pulled in MsgType/Timestamp order, so there's an index on MsgType + State + CreateTime.

Variations:
State for "E"rror.
Column for Reader process code.
Timestamps for state transitions.

This has provided a nice, scalable, visible, simple mechanism for doing a number of things like you are describing. If you have a basic understanding of databases, it's pretty foolproof and extensible.


Code from comments:

CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) 
AS 
DECLARE @MsgId INT 

BEGIN TRAN 

SELECT TOP 1 @MsgId = MsgId 
FROM MsgQueue 
WHERE MessageType = @pMessageType AND State = 'N' 
ORDER BY CreateTime


IF @MsgId IS NOT NULL 
BEGIN 

UPDATE MsgQueue 
SET State = 'A' 
WHERE MsgId = @MsgId 

SELECT MsgId, Msg 
FROM MsgQueue 
WHERE MsgId = @MsgId  
END 
ELSE 
BEGIN 
SELECT MsgId = NULL, Msg = NULL 
END 

COMMIT TRAN
远昼 2024-07-16 05:03:00

您应该将其设置为假的无人记录,而不是在不拥有该记录时将其设置为owner = null。 搜索 null 不会限制索引,您可能最终会进行表扫描。 (这是针对oracle的,SQL Server可能有所不同)

Instead of having owner = null when it isn't owned, you should set it to a fake nobody record instead. Searching for null doesn't limit the index, you might end up with a table scan. (this is for oracle, SQL server might be different)

与他有关 2024-07-16 05:03:00

正如可能的技术变革一样,您可能会考虑使用 MSMQ 或类似的东西。

您的每个作业/线程都可以查询消息队列以查看是否有新作业可用。 由于读取消息的行为会将其从堆栈中删除,因此可以确保只有一个作业/线程会获取该消息。

当然,这是假设您使用的是 Microsoft 平台。

Just as a possible technology change, you might consider using MSMQ or something similar.

Each of your jobs / threads could query the messaging queue to see if a new job was available. Because the act of reading a message removes it from the stack, you are ensured that only one job / thread would get the message.

Of course, this is assuming you are working with a Microsoft platform.

千秋岁 2024-07-16 05:03:00

请参阅 Vlad 的答案了解上下文,我只是在 Oracle 中添加等效内容,因为有一些“陷阱”需要注意。

不会

SELECT * FROM t order by x limit 2 FOR UPDATE OF t SKIP LOCKED

按照您期望的方式直接转换为 Oracle。 如果我们考虑几个翻译选项,我们可能会尝试以下任何一种:

SQL> create table t as
  2   select rownum x
  3   from dual
  4   connect by level <= 100;

Table created.

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from t order by x for update skip locked fetch first 2 rows only;
  5  end;
  6  /
  open rc for select * from t order by x for update skip locked fetch first 2 rows only;
                                                                *
ERROR at line 4:
ORA-06550: line 4, column 65:
PL/SQL: ORA-00933: SQL command not properly ended
ORA-06550: line 4, column 15:
PL/SQL: SQL Statement ignored

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from t order by x fetch first 2 rows only for update skip locked ;
  5  end;
  6  /
declare
*
ERROR at line 1:
ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
ORA-06512: at line 4

或者尝试回退到 ROWNUM 选项

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from ( select * from t order by x ) where rownum <= 10 for update skip locked;
  5  end;
  6  /
declare
*
ERROR at line 1:
ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
ORA-06512: at line 4

,但您不会得到任何乐趣。 因此,您需要自己控制“n”行的获取。 因此你可以编写如下代码:

SQL> declare
  2    rc sys_refcursor;
  3    res1 sys.odcinumberlist := sys.odcinumberlist();
  4  begin
  5    open rc for select * from t order by x for update skip locked;
  6    fetch rc bulk collect into res1 limit 10;
  7  end;
  8  /

PL/SQL procedure successfully completed.

See Vlad's answer for context, I'm just adding the equivalent in Oracle because there's a few "gotchas" to be aware of.

The

SELECT * FROM t order by x limit 2 FOR UPDATE OF t SKIP LOCKED

will not translate directly to Oracle in the way you might expect. If we look at a few options of translation, we might try any of the following:

SQL> create table t as
  2   select rownum x
  3   from dual
  4   connect by level <= 100;

Table created.

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from t order by x for update skip locked fetch first 2 rows only;
  5  end;
  6  /
  open rc for select * from t order by x for update skip locked fetch first 2 rows only;
                                                                *
ERROR at line 4:
ORA-06550: line 4, column 65:
PL/SQL: ORA-00933: SQL command not properly ended
ORA-06550: line 4, column 15:
PL/SQL: SQL Statement ignored

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from t order by x fetch first 2 rows only for update skip locked ;
  5  end;
  6  /
declare
*
ERROR at line 1:
ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
ORA-06512: at line 4

or perhaps try falling back to the ROWNUM option

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from ( select * from t order by x ) where rownum <= 10 for update skip locked;
  5  end;
  6  /
declare
*
ERROR at line 1:
ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
ORA-06512: at line 4

And you won't get any joy. You thus need to control the fetching of the "n" rows yourself. Thus you can code up something like:

SQL> declare
  2    rc sys_refcursor;
  3    res1 sys.odcinumberlist := sys.odcinumberlist();
  4  begin
  5    open rc for select * from t order by x for update skip locked;
  6    fetch rc bulk collect into res1 limit 10;
  7  end;
  8  /

PL/SQL procedure successfully completed.
轻拂→两袖风尘 2024-07-16 05:03:00

您正在尝试实现“Database as IPC”反模式。 查阅它以了解为什么您应该考虑正确地重新设计您的软件。

You are trying to implement de "Database as IPC" antipattern. Look it up to understand why you should consider redesigning your software properly.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文