JMS 消息驱动 Bean 工作同步

发布于 2024-07-19 15:49:14 字数 627 浏览 10 评论 0原文

我们刚刚开始构建 JMS 架构并具有以下基本设置:

  1. GLassfish v2.1
  2. MDB 通过 TopicConnectionFactory 监听主题(全部在本地服务器上)

现在,当新消息到达时,MDB 会生成一个工作线程,即使我们要按顺序传递消息,我们需要一个同步机制,以便线程在并发处理请求之前检查特定条件。

有没有办法让这些线程共享数据? 或者是否有任何其他机制(除了数据库表/行锁)可以用于同步?

提前致谢。


澄清一下,我不是在创建自己的线程。 正如每个人正确指出的那样,容器为我做到了这一点。 让我用一个例子来帮助解释我的困境。

-消息 A 在 t=0 时到达,“创建”数据 id 1 -

消息 B 在 t=0.1 时到达,“更新”数据 id 1

现在假设容器生成 2 个工作进程来处理 A 和 A。 B 并且“创建”数据比更新数据花费更多的时间,更新会更早处理并且没有效果。

更清楚地说,

-在处理消息 B 时,我会在 t=1 处查找数据 id 1(未找到它,因此无需执行任何操作即可完成)。

-数据 ID 1 将在 t=2 处处理消息 A 时创建。

We are just starting to build our JMS architecture and have the following basic setup:

  1. GLassfish v2.1
  2. MDB listening on a Topic through a TopicConnectionFactory (all on local server)

Now, the MDB spawns a worker thread when a new message arrives and even though we have in order delivery of messages, we need a synchronization mechanism so that threads check for a certain condition before processing the request concurrently.

Is there a way for these threads to share data? Or are there any other mechanisms (except for database table/row locks) that we can use for synchronization?

Thanks in advance.


To clarify, I am not creating my own threads. As everyone rightly pointed out, the container does that for me. Let me help explain my dilemma with an example.

-Message A arrives at t=0 which 'creates' data id 1

-Message B arrives at t=0.1 which 'updates' data id 1

Now assuming the container spawns 2 workers to process A & B and that it takes much more time to 'create' data than update it, the update would process earlier and have no effect.

To be clearer,

-While processing Message B, I would look for data id 1 at t=1 (not find it and thus have finish without doing anything).

-Data id 1 would be created while processing Message A at t=2.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

十级心震 2024-07-26 15:49:14

学究警报! 我是那种会阅读技术实际规范的人。

阅读 EJB 规范 版本 3.0,第 21.1.2 节(编程限制)不允许在您的代码。 这是语言和理由......

企业 Bean 不得尝试
来管理线程。 企业bean
不得尝试启动、停止、
挂起或恢复线程,或
更改线程的优先级或名称。
企业 Bean 不得尝试
管理线程组。

这些函数是为 EJB 容器保留的。 让企业
管理线程的 bean 会减少
容器能够正确地
管理运行时环境。

因此,如果您按照您所说的去做,EJB 警察就会在半夜来敲您的门并把您带走。 或者你的应用程序可能会出现故障,当你抱怨时供应商会笑。 或者根本不会发生任何不好的事情。

但是,正如达菲莫所说,为什么要这样做呢? 如果您想要大量线程提供的可伸缩性,您可以为您的 MDB 进行配置吗? EJB 的目的是为您处理类似的事情。

Pedant alert! I'm the kind of guy that reads the actual specs for technologies.

Reading the EJB spec version 3.0, section 21.1.2 (Programming Restrictions) disallows using threads in your code. Here's the language and the rationale ...

The enterprise bean must not attempt
to manage threads. The enterprise bean
must not attempt to start, stop,
suspend, or resume a thread, or to
change a thread’s priority or name.
The enterprise bean must not attempt
to manage thread groups.

These functions are reserved for the EJB container. Allowing the enterprise
bean to manage threads would decrease
the container’s ability to properly
manage the runtime environment.

So if you do what you're saying, the EJB police will come knocking on your door in the middle of the night and take you away. Or your app might malfunction and the vendor will laugh when you complain. Or nothing bad at all will happen.

But, as duffymo says, why do this? If you want the scalability offered by lots of threads, can you configure that in for your MDB? The point of EJB's is to handle stuff like that for you.

久随 2024-07-26 15:49:14

我不明白为什么 MDB 必须产生一个工作线程。 JMS 中有一个与消息侦听器关联的线程池。 这就是应该完成这项工作的线程。

EJB 规范规定 Bean 中不会产生线程。 容器处理线程。 这也包括多边开发银行。

侦听器应该处理从队列中取出的消息。 它需要的数据应该在消息中。 有什么必要分享?

我认为您的方法违背了推荐的 EJB 实践。

I don't see why the MDB has to spawn a worker thread. There's a thread pool associated with the message listeners in JMS. That's the thread that's supposed to be doing the work.

The EJB spec says no thread spawning in your beans. The container handles threading. That includes MDBs as well.

The listener should be processing the message it takes off the queue. The data it needs should be in the message. What's the need for sharing?

I think your approach goes against recommended EJB practices.

ま柒月 2024-07-26 15:49:14

真正的问题是,应用程序服务器通常会自行生成上述工作线程。
虽然 JMS 保证消息的消费顺序至少与在一个生产者中生成消息的顺序相同,但 MDB 规范明确指出不会保留该顺序(因为提到了工作线程)。 请参阅 JSR-000220 Enterprise JavaBeans 3.0 最终版本 (ejbcore)

没有可移植且 100% 可靠的方法来规避此问题。 由于工人的性质,引入了无法控制的竞争条件。

幸运的是,大多数应用程序服务器都有配置工作线程数量的方法,尽管是专有的且不兼容。

对于 JBoss 和 ActiveMQ,这是可行的:

@PoolClass(value = org.jboss.ejb3.StrictMaxPool.class, maxSize = 1)
@MessageDriven(activationConfig = {
    @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
    @ActivationConfigProperty(propertyName = "maxSessions", propertyValue = "1"),
    @ActivationConfigProperty(propertyName = "destination", propertyValue = "TEST.FOO")
})
@ResourceAdapter("activemq-ra.rar")
public class NewMessageBean implements MessageListener { ... }

在这种情况下,“maxSessions”是工作人员的数量。 它可能与其他 JMS 提供商不同,但这应该指向正确的方向。

The real problem is, that the Application Server usually spawns the mentioned workers by itself.
While JMS guarantees that the messages are consumed in the same order as they are produced at least within one producer, the MDB spec explicitely states that the order is not preserved (because of the mentioned workers). See section 5.4.11 of JSR-000220 Enterprise JavaBeans 3.0 Final Release (ejbcore).

There is no portable and 100% reliable way to circumvent this. Due to the nature of the workers race conditions are introduced which cannot be controlled.

Fortunately most Application Servers have ways, albeit proprietary and incompatible, to configure the number of workers.

For JBoss and ActiveMQ this works:

@PoolClass(value = org.jboss.ejb3.StrictMaxPool.class, maxSize = 1)
@MessageDriven(activationConfig = {
    @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
    @ActivationConfigProperty(propertyName = "maxSessions", propertyValue = "1"),
    @ActivationConfigProperty(propertyName = "destination", propertyValue = "TEST.FOO")
})
@ResourceAdapter("activemq-ra.rar")
public class NewMessageBean implements MessageListener { ... }

In this case the "maxSessions" are the number of workers. It may be different with other JMS providers, but that should point into the right direction.

旧街凉风 2024-07-26 15:49:14

如上所述,JMS 框架处理调度问题,例如分派线程。 您在此所做的任何操作不仅不如默认行为,而且可能会严重限制 JMS 的功能。

更复杂的 JMS 处理程序被设计为跨多个节点(= 服务器)工作,因此任何共享内存解决方案都会将您限制为单个节点上的单个 JVM,这将是一个遗憾,因为 JMS 的巨大优势是可扩展性。

一个可能的 JMSy 解决方案是使用一个“cookie”队列,将单个虚拟“cookie”消息用于同步活动。
当您的进程执行有争议的活动时,它会“等待”来自“cookie”队列的单个消息,当有争议的工作完成时,它会将 cookie 放回到队列中。 JMS 的魔力将处理几乎所有的阻塞、等待和错误恢复。

As mentioned above the JMS framework handles schedualing issues such as dispatching threads. Anything you do in this are will not only be inferior to the default beheaviour it will probably severly limit the functionality of your JMS.

The more sophisticated JMS handlers are designed to work across several nodes (= servers ) so any shared memory solution would limit you to a single JVM on a single node, which would be a pity as the great advantage of JMS is scalability.

A possible JMSy solution would be to have a "cookie" queue will a single dummy "cookie" message to synchroise activities.
When its time for your process to perform a contentious activity it "gets with wait" the single message from the "cookie" queue, when contentious work is complete it puts the cookie back on the queue. The magic of JMS will handle nearly all the blocking, waiting and error recovery.

∞觅青森が 2024-07-26 15:49:14

看来在每个人都回答之后,问题的性质完全改变了。 我将为后代做出一个迟来的回应; 这假设有某种类型的消息 ID 可用于排序。 您说“我们已按顺序传递消息”,但您没有确切说明这是如何实现的。

...由于您已经澄清您没有启动自己的线程,因此基本上在数据@ ID=1 的“创建”与其后续的“更新”之间存在竞争条件。 我假设您在创建和/或更新数据时锁定数据 @ ID=1 。 因此,有两种可能性:

  1. “创建数据”消息首先到达:(1) 锁 ID=1,(2) 创建数据。 (3)释放锁。 (4) 应用待处理的“更新”。
  2. “更新”在“创建”之前到达:(1) 锁 ID=1,(2) 创建丢失的数据(即,执行“更新插入”:插入数据,即使数据不存在)。 (3)释放锁。 (4) 忽略挂起的“create data”消息(“create”消息的序列号比“update”消息的序列号低)
    • 如果消息发送者被允许同时发送更新和插入(没有请求/响应),那么它们确实应该有某种序列号。 如果是这样,当“插入”到达时,其序列号将小于该行中的当前值,因此可以忽略它。 或者,如果“创建”消息类型与“更新”消息类型不同,那么如果数据已存在,则始终可以忽略“创建”。

我认为您遇到的问题是如何同步数据。 基本上,线程共享对象并创建互斥体(互斥)以允许单个线程访问数据,从而导致另一个线程阻塞。 这可以简单地通过 Java 的低级同步工具(“synchronized”关键字)或辅助此操作的内置类来完成(http://java.sun.com/docs/books/tutorial/essential/concurrency/highlevel.html)。

Looks like the question completely changed in nature after everyone already responded to it. I'll chip in a very belated response for posterity; this assumes there is some type of message ID that can be used for ordering. You say "we have in order delivery of messages", but you don't say exactly how this is achieved.

...Since you've clarified that you're not starting your own threads, you basically have a race condition between the "creation" of data @ ID=1 and its subsequent "update". I assume you're locking the data @ ID=1 while it's being created and/or updated. So, there are two possibilities:

  1. the "create data" message arrives first: (1) lock ID=1, (2) create the data. (3) release the lock. (4) apply the pending "update".
  2. the "update" arrives before the "create": (1) lock ID=1, (2) create the missing data (i.e, do an 'upsert': insert the data even though it's not there). (3) release the lock. (4) ignore the pending "create data" message (the 'create' message has a lower sequence number than the 'update'
    • If the message sender is allowed to send updates and inserts concurrently (without request/response), then they really should have some sort of sequence number. If so, when the 'insert' arrives, its sequence number will be less than the current value in that row, so it can be ignored. Or, if the 'create' message type is distinct from an 'update' message type, then 'creates' can always be ignored if the data already exists.

I think the question you have is how to synchronize on data. Basically, threads share objects and create a mutex (mutual exclusion) to allow a single thread access to the data, causing another thread to block. This can be done simply via Java's low-level synchronization facilities (the "synchronized" keyword), or built-in classes that assist with this ( http://java.sun.com/docs/books/tutorial/essential/concurrency/highlevel.html ).

夏花。依旧 2024-07-26 15:49:14

正如一些人提到的,您不应该在 MDB(或任何其他类型的 EJB 或与此相关的 servlet)中创建自己的线程。

许多 EJB 容器实际上不允许您创建和运行线程。 不过,有一种安全的方法可以做到这一点,即使用 commonj 规范中的 WorkManager,尽管在这种特殊情况下我看不出有什么理由这样做,因为 MDB 已经在它自己的“工作线程”中运行。

请参阅有关生成线程的信息了解更多信息有关为什么不应在 Java EE 服务器中生成线程以及如何在需要时安全地执行此操作的信息。

As was mentioned by several people, you should not create your own threads in an MDB (or any other type of EJB or a servlet for that matter).

Many EJB containers will not actually allow you to create and run threads. There is one safe way to do it though, by using a WorkManager from the commonj specification, although I see no reason for it in this particular case, since the MDB is already running in its own 'worker thread'.

See info on spawning threads here for more information on why you shouldn't spawn threads in a Java EE server and how to do it safely when you need to.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文