在 mysql 中实现消息传递队列表的最佳方法是什么

发布于 2024-07-10 15:29:17 字数 829 浏览 6 评论 0原文

这可能是我第十次实施这样的事情了,而且我从来没有对我提出的解决方案感到 100% 满意。

使用 mysql 表而不是“正确的”消息传递系统之所以有吸引力,主要是因为大多数应用程序已经使用一些关系数据库来处理其他事情(对于我一直在做的大部分事情来说,这往往是 mysql ),而很少有应用程序使用消息系统。 此外,关系数据库具有非常强的 ACID 属性,而消息传递系统通常没有。

第一个想法是使用:

create table jobs(
  id auto_increment not null primary key,
  message text not null,
  process_id varbinary(255) null default null,
  key jobs_key(process_id) 
);

然后入队看起来像这样:

insert into jobs(message) values('blah blah');

出队看起来像这样:

begin;
select * from jobs where process_id is null order by id asc limit 1;
update jobs set process_id = ? where id = ?; -- whatever i just got
commit;
-- return (id, message) to application, cleanup after done

表和入队看起来不错,但出队有点困扰我。 回滚的可能性有多大? 还是要被封杀? 我应该使用什么键来使其 O(1) 左右?

或者有比我正在做的更好的解决方案吗?

It's probably the tenth time I'm implementing something like this, and I've never been 100% happy about solutions I came up with.

The reason using mysql table instead of a "proper" messaging system is attractive is primarily because most application already use some relational database for other stuff (which tends to be mysql for most of the stuff I've been doing), while very few applications use a messaging system. Also - relational databases have very strong ACID properties, while messaging systems often don't.

The first idea is to use:

create table jobs(
  id auto_increment not null primary key,
  message text not null,
  process_id varbinary(255) null default null,
  key jobs_key(process_id) 
);

And then enqueue looks like this:

insert into jobs(message) values('blah blah');

And dequeue looks like this:

begin;
select * from jobs where process_id is null order by id asc limit 1;
update jobs set process_id = ? where id = ?; -- whatever i just got
commit;
-- return (id, message) to application, cleanup after done

Table and enqueue look nice, but dequeue kinda bothers me. How likely is it to rollback? Or to get blocked? What keys I should use to make it O(1)-ish?

Or is there any better solution that what I'm doing?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(8

鹿港巷口少年归 2024-07-17 15:29:17

你的出队可以更简洁。 您可以在没有显式事务的情况下在一个原子语句中完成此操作,而不是依赖事务回滚:

UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;

然后您可以使用以下命令拉取作业(括号 [] 表示可选,具体取决于您的具体情况):

SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];

Your dequeue could be more concise. Rather than relying on the transaction rollback, you could do it in one atomic statement without an explicit transaction:

UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;

Then you can pull jobs with (brackets [] mean optional, depending on your particulars):

SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];
紙鸢 2024-07-17 15:29:17

我已经构建了一些消息队列系统,我不确定您指的是哪种类型的消息,但是在出队的情况下(这是一个词吗?)我做了和您一样的事情。 你的方法看起来简单、干净、可靠。 并不是说我的工作是最好的,但事实证明它对于许多站点的大型监控非常有效。 (错误记录、群发电子邮件营销活动、社交网络通知)

我的投票:不用担心!

I've built a few message queuing systems and I'm not certain what type of message you're referring to, but in the case of the dequeuing (is that a word?) I've done the same thing you've done. Your method looks simple, clean and solid. Not that my work is the best, but it's proven very effective for large-monitoring for many sites. (error logging, mass email marketing campaigns, social networking notices)

My vote: no worries!

滿滿的愛 2024-07-17 15:29:17

Brian Aker 不久前谈到了队列引擎。 也有人讨论过 SELECT table FROM DELETE 语法。

如果您不担心吞吐量,则可以随时使用 SELECT GET_LOCK() 作为互斥体。 例如:

SELECT GET_LOCK('READQUEUE');
SELECT * FROM jobs;
DELETE FROM JOBS WHERE ID = ?;
SELECT RELEASE_LOCK('READQUEUE');

如果您想要变得非常奇特,请将其包装在存储过程中。

Brian Aker talked about a queue engine a while ago. There's been talk about a SELECT table FROM DELETE syntax, too.

If you're not worried about throughput, you can always use SELECT GET_LOCK() as a mutex. For example:

SELECT GET_LOCK('READQUEUE');
SELECT * FROM jobs;
DELETE FROM JOBS WHERE ID = ?;
SELECT RELEASE_LOCK('READQUEUE');

And if you want to get really fancy, wrap it in a stored procedure.

蓬勃野心 2024-07-17 15:29:17

在 MySQL 8 中,您可以使用新的 NOWAITSKIP LOCKED 关键字来避免特殊锁​​定机制的复杂性:

START TRANSACTION;
SELECT id, message FROM jobs
 WHERE process_id IS NULL
 ORDER BY id ASC LIMIT 1
 FOR UPDATE SKIP LOCKED;
UPDATE jobs
 SET process_id = ?
 WHERE id = ?;
COMMIT;

传统上,如果没有黑客和不寻常的特殊表或列,这是很难实现的,不可靠的解决方案或失去并发性。

SKIP LOCKED 可能会导致大量消费者出现性能问题。

然而,这仍然不能处理在事务回滚时自动标记作业完成的情况。 为此,您可能需要保存点。 然而,这可能并不能解决所有情况。 您确实希望设置一个在事务失败时执行的操作,但作为事务的一部分!

将来可能会有更多功能来帮助优化某些情况,例如也可以返回匹配行的更新。 在更改日志中随时了解新特性和功能非常重要。

In MySQL 8 you can use the new NOWAIT and SKIP LOCKED keywords to avoid complexity with special locking mechanisms:

START TRANSACTION;
SELECT id, message FROM jobs
 WHERE process_id IS NULL
 ORDER BY id ASC LIMIT 1
 FOR UPDATE SKIP LOCKED;
UPDATE jobs
 SET process_id = ?
 WHERE id = ?;
COMMIT;

Traditionally this was hard to achieve without hacks and unusual special tables or columns, unreliable solutions or losing concurrency.

SKIP LOCKED may cause performance issues with extremely large numbers of consumers.

This still does not however handle automatically marking the job complete on transaction rollback. For this you may need save points. That however might not solve all cases. You would really want to set an action to execute on transaction failure but as part of the transaction!

In future it's possible there may be more features to help optimise with cases such as an update that can also return the matched rows. It's important to keep apprised of new features and capabilities in the change log.

满栀 2024-07-17 15:29:17

这是我使用的解决方案,在没有当前线程的 process_id 的情况下工作,或者锁定表。

SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;

获取 $row 数组中的结果,并执行:

DELETE from jobs WHERE ID=$row['ID'];

然后获取受影响的行(mysql_affected_rows)。 如果有受影响的行,则处理 $row 数组中的作业。 如果有 0 行受影响,则意味着其他进程已经在处理所选作业。 重复以上步骤,直到没有行为止。

我已经使用具有 100k 行的“作业”表对此进行了测试,并生成了 20 个执行上述操作的并发进程。 没有发生竞争条件。 您可以修改上述查询来更新带有处理标志的行,并在实际处理后删除该行:

while(time()-$startTime<$timeout)
{
SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1;
if (count($row)==0) break;
UPDATE jobs set processing=1 WHERE ID=$row['ID'];
if (mysql_affected_rows==0) continue;
//process your job here
DELETE from jobs WHERE ID=$row['ID'];
}

不用说,您应该使用适当的消息队列(ActiveMQ、RabbitMQ 等)来完成此类工作。 不过,我们不得不求助于这个解决方案,因为我们的主机在更新软件时经常会破坏一些东西,所以破坏的东西越少越好。

Here is a solution I used, working without the process_id of the current thread, or locking the table.

SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;

Get the result in a $row array, and execute:

DELETE from jobs WHERE ID=$row['ID'];

Then get the affected rows(mysql_affected_rows). If there are affected rows, process the job in the $row array. If there are 0 affected rows, it means some other process is already processing the selected job. Repeat the above steps until there are no rows.

I've tested this with a 'jobs' table having 100k rows, and spawning 20 concurrent processes that do the above. No race conditions happened. You can modify the above queries to update a row with a processing flag, and delete the row after you actually processed it:

while(time()-$startTime<$timeout)
{
SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1;
if (count($row)==0) break;
UPDATE jobs set processing=1 WHERE ID=$row['ID'];
if (mysql_affected_rows==0) continue;
//process your job here
DELETE from jobs WHERE ID=$row['ID'];
}

Needless to say, you should use a proper message queue (ActiveMQ, RabbitMQ, etc) for this kind of work. We had to resort to this solution though, as our host regularly breaks things when updating software, so the less stuff to break the better.

昔日梦未散 2024-07-17 15:29:17

我建议使用 Quartz.NET

它有 SQL Server、Oracle、MySql、SQLite 和 Firebird 的提供程序。

I would suggest using Quartz.NET

It has providers for SQL Server, Oracle, MySql, SQLite and Firebird.

债姬 2024-07-17 15:29:17

此线程有设计应可映射的信息。

引用:

这是我过去成功使用过的内容:

MsgQueue 表架构

MsgId 标识 -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL
SourceCode varchar(20) -- 插入消息的过程 -- NULLable
State char(1) -- 如果已排队,则为“新”;如果正在处理,则为“A”(活动);如果已完成,则为“C”;默认为“N”—— NOT NULL
CreateTime 日期时间 -- 默认 GETDATE() -- NOT NULL
Msg varchar(255) -- NULLable

您的消息类型是您所期望的 - 符合插入进程和读取进程之间契约的消息,使用 XML 或您选择的其他表示形式(JSON例如,在某些情况下会很方便)。

然后0到n个进程可以插入,并且0到n个进程可以读取和处理消息。每个读取进程通常处理单个消息类型。 可以运行某个进程类型的多个实例以实现负载平衡。

读取器拉取一条消息,并在处理该消息时将状态更改为“A”active。 完成后,它将状态更改为“C”完成。 它可以删除该消息或不删除该消息,具体取决于您是否要保留审核跟踪。 State = 'N' 的消息按照 MsgType/Timestamp 顺序拉取,因此在 MsgType + State + CreateTime 上有一个索引。

变体:
“E”错误状态。
Reader 进程代码列。
状态转换的时间戳。

这提供了一个很好的、可扩展的、可见的、简单的机制来完成您所描述的许多事情。 如果您对数据库有基本的了解,那么它是非常简单且可扩展的。 由于原子状态转换事务,从来没有出现过锁回滚等问题。

This thread has design information that should be mappable.

To quote:

Here's what I've used successfully in the past:

MsgQueue table schema

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL
SourceCode varchar(20) -- process inserting the message -- NULLable
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL
CreateTime datetime -- default GETDATE() -- NOT NULL
Msg varchar(255) -- NULLable

Your message types are what you'd expect - messages that conform to a contract between the process(es) inserting and the process(es) reading, structured with XML or your other choice of representation (JSON would be handy in some cases, for instance).

Then 0-to-n processes can be inserting, and 0-to-n processes can be reading and processing the messages, Each reading process typically handles a single message type. Multiple instances of a process type can be running for load-balancing.

The reader pulls one message and changes the state to "A"ctive while it works on it. When it's done it changes the state to "C"omplete. It can delete the message or not depending on whether you want to keep the audit trail. Messages of State = 'N' are pulled in MsgType/Timestamp order, so there's an index on MsgType + State + CreateTime.

Variations:
State for "E"rror.
Column for Reader process code.
Timestamps for state transitions.

This has provided a nice, scalable, visible, simple mechanism for doing a number of things like you are describing. If you have a basic understanding of databases, it's pretty foolproof and extensible. There's never been an issue with locks roll-backs etc. because of the atomic state transition transactions.

猥︴琐丶欲为 2024-07-17 15:29:17

您可以使用中间表来维护队列的偏移量。

create table scan(
  scan_id int primary key,
  offset_id int
);

您可能还会进行多次扫描,因此每次扫描会有一个偏移量。 在扫描开始时初始化 offset_id = 0。

begin;
select * from jobs where order by id where id > (select offset_id from scan where scan_id = 0)  asc limit 1;
update scan set offset_id = ? where scan_id = ?; -- whatever i just got
commit;

您所需要做的只是保持最后的偏移量。 这也将为您节省大量空间(每条记录的 process_id)。 希望这听起来合乎逻辑。

You can have an intermediate table to maintain the offset for the queue.

create table scan(
  scan_id int primary key,
  offset_id int
);

You might have multiple scans going on as well, hence one offset per scan. Initialise the offset_id = 0 at the start of the scan.

begin;
select * from jobs where order by id where id > (select offset_id from scan where scan_id = 0)  asc limit 1;
update scan set offset_id = ? where scan_id = ?; -- whatever i just got
commit;

All you need to do is just to maintain the last offset. This would also save you significant space (process_id per record). Hope this sounds logical.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文