防止消息处理的竞争条件

发布于 2024-12-08 14:36:25 字数 1746 浏览 0 评论 0原文

我有一个通过 Web 服务接收消息(事件)的 J2EE 应用程序。这些消息具有不同的类型(根据类型需要不同的处理)并以特定的顺序发送。它发现了一个问题,即某些消息类型的处理时间比其他消息类型要长。结果是序列中第二个接收到的消息可能会在序列中第一个消息之前被处理。我尝试通过在处理消息的方法周围放置一个同步块来解决这个问题。这似乎可行,但我不确定这是“正确”的方法吗?是否有更合适的替代方案或者这是“可接受的”?我添加了一小段代码来尝试更清楚地解释。 ....任何建议/指导表示赞赏。

public class EventServiceImpl implements EventService {
  public String submit (String msg) {

    if (msg == null)
        return ("NAK");

            EventQueue.getInstance().submit(msg);

    return "ACK";
  }
}


public class EventQueue {
    private static EventQueue instance = null;
    private static int QUEUE_LENGTH = 10000;
    protected boolean done = false;
    BlockingQueue<String> myQueue = new LinkedBlockingQueue<String>(QUEUE_LENGTH);

protected EventQueue() {
    new Thread(new Consumer(myQueue)).start();
}

public static EventQueue getInstance() {
      if(instance == null) {
         instance = new EventQueue();
      }
      return instance;
}

public void submit(String event) {
    try {
        myQueue.put(event);
    } catch (InterruptedException ex) {
    }
}

class Consumer implements Runnable {
    protected BlockingQueue<String> queue;

    Consumer(BlockingQueue<String> theQueue) { this.queue = theQueue; }

    public void run() {
      try {
        while (true) {
          Object obj = queue.take();
          process(obj);
          if (done) {
            return;
          }
        }
      } catch (InterruptedException ex) {
      }
    }

    void process(Object obj) {
        Event event = new Event( (String) obj);
        EventHandler handler = EventHandlerFactory.getInstance(event);
        handler.execute();
    }
}

// Close queue gracefully
public void close() {
    this.done = true;
}

I have a J2EE application that receives messages (events) via a web service. The messages are of varying types (requiring different processing depending on type) and sent in a specific sequence. It have identified a problem where some message types take longer to process than others. The result is that a message received second in a sequence may be processed before the first in the sequence. I have tried to address this problem by placing a synchronized block around the method that processes the messages. This seems to work, but I am not confident that this is the "correct" approach? Is there perhaps an alternative that may be more appropriate or is this "acceptable"? I have included a small snippit of code to try to explain more clearly. .... Any advice / guidance appreciated.

public class EventServiceImpl implements EventService {
  public String submit (String msg) {

    if (msg == null)
        return ("NAK");

            EventQueue.getInstance().submit(msg);

    return "ACK";
  }
}


public class EventQueue {
    private static EventQueue instance = null;
    private static int QUEUE_LENGTH = 10000;
    protected boolean done = false;
    BlockingQueue<String> myQueue = new LinkedBlockingQueue<String>(QUEUE_LENGTH);

protected EventQueue() {
    new Thread(new Consumer(myQueue)).start();
}

public static EventQueue getInstance() {
      if(instance == null) {
         instance = new EventQueue();
      }
      return instance;
}

public void submit(String event) {
    try {
        myQueue.put(event);
    } catch (InterruptedException ex) {
    }
}

class Consumer implements Runnable {
    protected BlockingQueue<String> queue;

    Consumer(BlockingQueue<String> theQueue) { this.queue = theQueue; }

    public void run() {
      try {
        while (true) {
          Object obj = queue.take();
          process(obj);
          if (done) {
            return;
          }
        }
      } catch (InterruptedException ex) {
      }
    }

    void process(Object obj) {
        Event event = new Event( (String) obj);
        EventHandler handler = EventHandlerFactory.getInstance(event);
        handler.execute();
    }
}

// Close queue gracefully
public void close() {
    this.done = true;
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

西瓜 2024-12-15 14:36:25

我不确定您正在使用的框架(EJB(MDB)/JMS)是什么。通常应避免在诸如 EJB/JMS 之类的托管环境中使用同步(这不是一个好的做法)。一种解决方法是

  • 客户端在发送下一条消息之前应等待服务器的确认。
  • 这样您的客户端本身就可以控制事件的顺序。

请注意,如果有多个客户端提交消息,这将不起作用。

编辑:

您遇到的情况是,Web 服务的客户端按顺序发送消息,而不考虑消息处理时间。它只是一个接一个地转储消息。这是基于队列(先进先出) 的解决方案的一个很好的案例。我建议采用以下两种方法来实现此目的

  1. 使用 JMS 。这将产生添加 JMS 提供程序 和编写一些管道代码的额外开销。< /p>

  2. 使用一些多线程模式,例如 生产者-消费者,其中您的 Web 服务处理程序将是将传入消息转储到队列中,单线程消费者一次将消费一条消息。请参阅使用 java.util.concurrent 包的此示例

  3. 使用数据库。将传入消息转储到数据库中。使用不同的基于调度程序的程序来扫描数据库(基于序列号)并相应地处理消息。

    第一个和第三个解决方案对于此类问题来说是非常标准的。第二种方法速度很快,并且不需要在代码中添加任何其他库。

I am not sure what is the framework (EJB(MDB)/JMS) you are working with. Generally using synchronization inside a Managed Environment like that of EJB/JMS should be avoided(its not a good practice). One way to get around is

  • the client should wait for the acknowledgement from the server before it sends the next message.
  • this way you client itself will control the sequence of events.

Please note this won't work if there are multiple client submitting the messages.

EDIT:

You have a situation wherein the client of the web service sends message in sequence without taking into account the message processing time. It simply dumps the message one after another. This is a good case for Queue ( First In First Out ) based solution. I suggest following two ways to accomplish this

  1. Use JMS . This will have an additional overhead of adding a JMS providers and writing some plumbing code.

  2. Use some multitheading pattern like Producer-Consumer wherein your web service handler will be dumping the incoming message in a Queue and a single threaded consumer will consume one message at a time. See this example using java.util.concurrent package.

  3. Use database. Dump the incoming messages into a database. Use a different scheduler based program to scan the datbase (based on sequence number) and process the messages accordingly.

    First and third solution is very standard for these type of problems. The second approach would be quick and won't need any additional libraries in your code.

不喜欢何必死缠烂打 2024-12-15 14:36:25

如果要按特定顺序处理事件,那么为什么不尝试在消息中添加“eventID”和“orderID”字段呢?这样,您的 EventServiceImpl 类就可以排序、排序,然后以正确的顺序执行(无论它们创建和/或传递给处理程序的顺序如何)。

我预计,同步 handler.execute() 块不会获得所需的结果。 synchronized 关键字的作用就是防止多个线程同时执行该块。它在正确排序下一个线程的领域没有做任何事情。

如果 synchronized 块似乎确实使事情正常工作,那么我断言您非常幸运,因为消息正在被创建、传递,然后按正确的顺序执行。在多线程环境下,这个就不能保证了!我会采取措施确保你能够控制这一切,而不是依靠好运。

示例:

  1. 消息按照“client01-A”、“client01-C”的顺序创建,
    'client01-B', 'client01-D'
  2. 消息按 'client01-D' 的顺序到达处理程序,
    'client01-B', 'client01-A', 'client01-C'
  3. EventHandler 可以区分从一个客户端到另一个客户端的消息,并开始缓存 'client01' 的消息。
  4. EventHandler recv 的“client01-A”消息并知道它可以处理该消息并执行此操作。
  5. EventHandler 在缓存中查找消息“client01-B”,找到它并处理它。
  6. EventHandler 找不到“client01-C”,因为它尚未到达。
  7. EventHandler接收'client01-C'并处理它。
  8. EventHandler 在缓存中查找“client01-D”,找到它、处理它,并认为“client01”交互已完成。

沿着这些思路的一些东西将确保正确的处理并促进多线程的良好使用。

If the events are to be processed in a specific sequence, then why not try adding "eventID" and 'orderID' fields to the messages? This way your EventServiceImpl class can sort, order and then execute in the proper order (regardless of the order they are created and/or delivered to the handler).

Synchronizing the handler.execute() block will not get the desired results, I expect. All the synchronized keyword does is prevent multiple threads from executing that block at the same time. It does nothing in the realm of properly ordering which thread goes next.

If the synchronized block does seem to make things work, then I assert you are getting very lucky in that the messages are being created, delivered and then acted upon in the proper order. In a multithread environment, this is not assured! I'd take steps to assure you are controlling this, rather than relying on good fortune.

Example:

  1. Messages are created in the order 'client01-A', 'client01-C',
    'client01-B', 'client01-D'
  2. Messages arrive at the handler in the order 'client01-D',
    'client01-B', 'client01-A', 'client01-C'
  3. EventHandler can distinquish messages from one client to another and starts to cache 'client01' 's messages.
  4. EventHandler recv's 'client01-A' message and knows it can process this and does so.
  5. EventHandler looks in cache for message 'client01-B', finds it and processes it.
  6. EventHandler cannot find 'client01-C' because it hasn't arrived yet.
  7. EventHandler recv's 'client01-C' and processes it.
  8. EventHandler looks in cache for 'client01-D' finds it, processes it, and considers the 'client01' interaction complete.

Something along these lines would assure proper processing and would promote good use of multiple threads.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文