如何以事务方式轮询数据库队列表并进行回滚?
我希望设置一个将用作消息队列的数据库表。该表将插入具有唯一 ID 和时间戳以及“待处理”状态的消息。
假设对该表的插入处理正确,我想知道使用简单的 HSQLDB 2.0 数据库以事务方式处理来自该表的消息的最佳方法是什么(尽管这个问题应该适用于所有事务支持数据库)。
我希望读取状态为“待处理”的下一条消息,并确保没有其他队列处理器也可以处理相同的记录,然后提交或回滚。
我提供了一些代码片段,说明我计划如何使用普通的旧式 JDBC 来实现此目的。
- 这行得通吗?
- 有更好的选择吗?
DDL:
create table message_queue (
qidx integer,
message varchar(120),
status varchar(20),
inserted_date timestamp,
inserted_by varchar(20),
processed_date timestamp,
processed_by varchar(20),
)
insert into message_queue values (1,'Important message here','PENDING','2010-08-10 00:01:00', 'BOB', null,null)
这是我读取 SQL 的队列:
SET AUTOCOMMIT FALSE
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE
START TRANSACTION
DECLARE nextID INTEGER DEFAULT 0
SET nextID = select max(qidx) from message_queue where status = 'PENDING'
update message_queue set status = 'CONSUMED' where QIDX = nextID
select * from message_queue where QIDX = nextID
ROLLBACK
这是我的连接代码片段:
conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
try {
String message = getNextMessage(conn); // uses sql in snippet
processMessage(message);
conn.commit(); // should commit
} catch (Exception e) {
conn.rollback(); // should rollback update
}
I wish to setup a database table that will be used as a message queue. The table will have messages inserted into it with a unique id and a timestamp and a status of 'PENDING'.
Assuming that the insertion into this table is properly handled, I wish to know what is the best way to transactionally process messages from this table using a simple HSQLDB 2.0 database (although this question should apply to all transaction supporting databases).
I wish to read the next message with a status of 'pending' and make sure that no other queue processor can also process the same record then either commit or rollback.
I include some code snippets for how I plan to achieve this using plain old JDBC.
- Will this work?
- Are there better alternatives?
DDL:
create table message_queue (
qidx integer,
message varchar(120),
status varchar(20),
inserted_date timestamp,
inserted_by varchar(20),
processed_date timestamp,
processed_by varchar(20),
)
insert into message_queue values (1,'Important message here','PENDING','2010-08-10 00:01:00', 'BOB', null,null)
Here is my queue reading SQL:
SET AUTOCOMMIT FALSE
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE
START TRANSACTION
DECLARE nextID INTEGER DEFAULT 0
SET nextID = select max(qidx) from message_queue where status = 'PENDING'
update message_queue set status = 'CONSUMED' where QIDX = nextID
select * from message_queue where QIDX = nextID
ROLLBACK
Here is my connection code fragments:
conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
try {
String message = getNextMessage(conn); // uses sql in snippet
processMessage(message);
conn.commit(); // should commit
} catch (Exception e) {
conn.rollback(); // should rollback update
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
在这些情况下使用的一般模式(跨数据库,无锁,无等待 - 准确地说是乐观锁):
general pattern used in these cases (cross-database, no locks, no waiting - optimistic lock to be precise):
当您必须考虑数据库的多用户性质时,排队绝对是一个不平凡的架构。一般来说,您不想自己实现这些事情。我相信您很快就会发现,问题在于,当有多个读取器尝试从同一队列读取数据时,您只希望一个进程获取一条消息。也就是说,您必须确保一条消息仅被处理一次。所以然后你开始思考“好吧,好吧,我会更新该记录,这样其他进程就不会得到它”,但随后你意识到,在你提交之前,其他进程不会看到你的更改。如果您锁定记录,则在完成第一个进程的处理之前,您无法从队列中读取另一个进程,本质上是序列化整个过程。
根据设计,队列事务必须与从队列读取的进程的事务有些正交。不幸的是,在这一领域,每个数据库的实现都将显着不同。
也许更好的方法是使用另一个库(不一定是数据库)来进行消息传递。由于 HSQLDB 没有内置的队列支持,因此队列的不同(java)实现可能是解决这种情况的工具。
Queueing is definitely a non-trivial architecture when you have to consider the multi-user nature of databases. Generally speaking, you don't want to implement these kinds of things yourself. The problem, as I'm sure you quickly discovered, is that you only want one process to get a single message when there are multiple readers attempting to read from the same queue. That is, you have to ensure that a message is processed only once. So then you start thinking "ok, well, I'll update that record so the other processes won't get it", but then you realize that until you commit the other processes won't see your change. If you lock the record, then you can't have another process read from the queue until you have finished processing the first one, essentially serializing the whole thing.
Queue transactions, by design, have to be somewhat orthogonal to the transactions of the processes reading from the queue. Unfortunately, this is one area where the implementation is going to be significantly different for each database.
Perhaps a better approach would be to use another library (not necessarily the database) for doing messaging. Since HSQLDB doesn't have built-in support for queueing, a different (java) implementation of queues may be a tool for this situation.