如何以事务方式轮询数据库队列表并进行回滚?

发布于 2024-09-13 05:45:17 字数 1405 浏览 2 评论 0原文

我希望设置一个将用作消息队列的数据库表。该表将插入具有唯一 ID 和时间戳以及“待处理”状态的消息。

假设对该表的插入处理正确,我想知道使用简单的 HSQLDB 2.0 数据库以事务方式处理来自该表的消息的最佳方法是什么(尽管这个问题应该适用于所有事务支持数据库)。

我希望读取状态为“待处理”的下一条消息,并确保没有其他队列处理器也可以处理相同的记录,然后提交或回滚。

我提供了一些代码片段,说明我计划如何使用普通的旧式 JDBC 来实现此目的。

  • 这行得通吗?
  • 有更好的选择吗?

DDL:

create table message_queue (
    qidx integer,
    message varchar(120),
    status varchar(20),
    inserted_date timestamp,
    inserted_by varchar(20),
    processed_date timestamp,
    processed_by varchar(20),
)

insert into message_queue values (1,'Important message here','PENDING','2010-08-10 00:01:00', 'BOB', null,null)

这是我读取 SQL 的队列:

SET AUTOCOMMIT FALSE
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE
START TRANSACTION
    DECLARE nextID INTEGER DEFAULT 0
    SET nextID = select max(qidx) from message_queue where status = 'PENDING' 
    update message_queue set status = 'CONSUMED' where QIDX = nextID
    select * from message_queue where QIDX = nextID
ROLLBACK

这是我的连接代码片段:

    conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
    try {
        String message = getNextMessage(conn); // uses sql in snippet
        processMessage(message);
        conn.commit(); // should commit
    } catch (Exception e) {
        conn.rollback(); // should rollback update
    }

I wish to setup a database table that will be used as a message queue. The table will have messages inserted into it with a unique id and a timestamp and a status of 'PENDING'.

Assuming that the insertion into this table is properly handled, I wish to know what is the best way to transactionally process messages from this table using a simple HSQLDB 2.0 database (although this question should apply to all transaction supporting databases).

I wish to read the next message with a status of 'pending' and make sure that no other queue processor can also process the same record then either commit or rollback.

I include some code snippets for how I plan to achieve this using plain old JDBC.

  • Will this work?
  • Are there better alternatives?

DDL:

create table message_queue (
    qidx integer,
    message varchar(120),
    status varchar(20),
    inserted_date timestamp,
    inserted_by varchar(20),
    processed_date timestamp,
    processed_by varchar(20),
)

insert into message_queue values (1,'Important message here','PENDING','2010-08-10 00:01:00', 'BOB', null,null)

Here is my queue reading SQL:

SET AUTOCOMMIT FALSE
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE
START TRANSACTION
    DECLARE nextID INTEGER DEFAULT 0
    SET nextID = select max(qidx) from message_queue where status = 'PENDING' 
    update message_queue set status = 'CONSUMED' where QIDX = nextID
    select * from message_queue where QIDX = nextID
ROLLBACK

Here is my connection code fragments:

    conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
    try {
        String message = getNextMessage(conn); // uses sql in snippet
        processMessage(message);
        conn.commit(); // should commit
    } catch (Exception e) {
        conn.rollback(); // should rollback update
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

丑疤怪 2024-09-20 05:45:17

在这些情况下使用的一般模式(跨数据库,无锁,无等待 - 准确地说是乐观锁):

  1. 多个工作人员之一从状态为“待处理”的队列表中进行选择。表包含 version_number 列(整数)。
  2. 工作人员浏览它选择的项目列表,并尝试将每个单独的项目更新为“正在进行”(或其他)状态。更新语句的 where 子句包括“where version_number = ”,显然还包括该项目的主键。
  3. 如果更新语句返回 1 作为更新的行数,则该工作人员成功保留了该项目,并且可以继续工作。如果更新语句返回 0 行已更新 - 该项目已被其他某个工作人员选择,因此该工作人员应该默默地跳过它。

general pattern used in these cases (cross-database, no locks, no waiting - optimistic lock to be precise):

  1. one of multiple workers selects from queue table where status is "pending". Table includes a version_number column (integer).
  2. worker goes thru the list of items it selected and tries to update each individual item to status "in-progress" (or whatever). The where clause of the update statement includes "where version_number = " and, obviously, the primary key for the item.
  3. if update statement returned 1 as number of rows updated, this worker successfully reserved the item and it can proceed with the work. If update statement returned 0 rows updated - this item was selected by some other worker so this worker should just silently skip it.
不必在意 2024-09-20 05:45:17

当您必须考虑数据库的多用户性质时,排队绝对是一个不平凡的架构。一般来说,您不想自己实现这些事情。我相信您很快就会发现,问题在于,当有多个读取器尝试从同一队列读取数据时,您只希望一个进程获取一条消息。也就是说,您必须确保一条消息仅被处理一次。所以然后你开始思考“好吧,好吧,我会更新该记录,这样其他进程就不会得到它”,但随后你意识到,在你提交之前,其他进程不会看到你的更改。如果您锁定记录,则在完成第一个进程的处理之前,您无法从队列中读取另一个进程,本质上是序列化整个过程。

根据设计,队列事务必须与从队列读取的进程的事务有些正交。不幸的是,在这一领域,每个数据库的实现都将显着不同。

也许更好的方法是使用另一个库(不一定是数据库)来进行消息传递。由于 HSQLDB 没有内置的队列支持,因此队列的不同(java)实现可能是解决这种情况的工具。

Queueing is definitely a non-trivial architecture when you have to consider the multi-user nature of databases. Generally speaking, you don't want to implement these kinds of things yourself. The problem, as I'm sure you quickly discovered, is that you only want one process to get a single message when there are multiple readers attempting to read from the same queue. That is, you have to ensure that a message is processed only once. So then you start thinking "ok, well, I'll update that record so the other processes won't get it", but then you realize that until you commit the other processes won't see your change. If you lock the record, then you can't have another process read from the queue until you have finished processing the first one, essentially serializing the whole thing.

Queue transactions, by design, have to be somewhat orthogonal to the transactions of the processes reading from the queue. Unfortunately, this is one area where the implementation is going to be significantly different for each database.

Perhaps a better approach would be to use another library (not necessarily the database) for doing messaging. Since HSQLDB doesn't have built-in support for queueing, a different (java) implementation of queues may be a tool for this situation.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文