单一生产者单一消费者现在我需要多个消费者

发布于 2024-12-12 02:02:02 字数 1022 浏览 0 评论 0原文

我遇到过这样的情况:我编写了一个简单的生产者消费者模型,用于从蓝牙读取数据块,然后每 10k 字节将其写入文件。我使用标准 PC 模型,使用 Vector 作为消息持有者。那么我该如何改变这一点,以便多个线程消费者可以读取相同的消息,我认为这个术语应该是多播?我实际上是在 Android 手机上使用它,因此 JMS 可能不是一个选择。

static final int MAXQUEUE = 50000; 
private Vector<byte[]> messages = new Vector<byte[]>(); 

/**
 * Put the message in the queue for the Consumer Thread
 */
private synchronized void putMessage(byte[] send) throws InterruptedException { 

    while ( messages.size() == MAXQUEUE ) 
        wait(); 
    messages.addElement( send ); 
    notify(); 
} 


/**
 * This method is called by the consumer to see if any messages in the queue
 */
public synchronized byte[] getMessage()throws InterruptedException { 
    notify(); 
    while ( messages.size() == 0 && !Thread.interrupted()) {
        wait(1); 
    }
    byte[] message = messages.firstElement(); 
    messages.removeElement( message ); 
    return message; 
} 

我正在引用 Oreilly 书中 消息解析器 部分的代码

I have a Situation where I wrote a simple Producer Consumer model for reading in chunks of data from Bluetooth then every 10k bytes I write that to file. I used a standard P-C Model using a Vector as my message holder. So how do I change this so that multiple Thread consumers can read the same messages, I think the term would be Multicaster? I am actually using this on an Android phone so JMS is probably not an option.

static final int MAXQUEUE = 50000; 
private Vector<byte[]> messages = new Vector<byte[]>(); 

/**
 * Put the message in the queue for the Consumer Thread
 */
private synchronized void putMessage(byte[] send) throws InterruptedException { 

    while ( messages.size() == MAXQUEUE ) 
        wait(); 
    messages.addElement( send ); 
    notify(); 
} 


/**
 * This method is called by the consumer to see if any messages in the queue
 */
public synchronized byte[] getMessage()throws InterruptedException { 
    notify(); 
    while ( messages.size() == 0 && !Thread.interrupted()) {
        wait(1); 
    }
    byte[] message = messages.firstElement(); 
    messages.removeElement( message ); 
    return message; 
} 

I am referencing code from an Oreilly book Message Parser section

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

忆依然 2024-12-19 02:02:02

pub-sub机制绝对是实现你想要的方式。我不确定为什么 Android 开发会限制您使用 JMS,因为 JMS 是一个非常简单的规范。查看
此线程

Pub-sub mechanism is definitely the way to achieve what you want. I am not sure why developing for Android will restrict you from using JMS, which is as simple a spec as it gets. Check out
this thread on SO.

通知家属抬走 2024-12-19 02:02:02

您绝对应该使用 队列 而不是 Vector!
为每个线程提供自己的队列,当收到新消息时,add() 将新消息添加到每个线程的队列中。为了灵活性,侦听器模式也可能很有用。

编辑:

好的,我觉得我也应该添加一个例子:
(经典观察者模式)

这是接口,所有消费者都必须实现:

public interface MessageListener {
  public void newMessage( byte[] message );
}

生产者可能看起来像这样:

public class Producer {
  Collection<MessageListener> listeners = new ArrayList<MessageListener>();


  // Allow interested parties to register for new messages
  public void addListener( MessageListener listener ) {
    this.listeners.add( listener );
  }

  public void removeListener( Object listener ) {
    this.listeners.remove( listener );
  }

  protected void produceMessages() {
    byte[] msg = new byte[10];

    // Create message and put into msg

    // Tell all registered listeners about the new message:
    for ( MessageListener l : this.listeners ) {
      l.newMessage( msg );
    }

  }
}

消费者类可以是(使用阻塞队列来执行所有 wait()ing 和 <为我们代码>notify()):

public class Consumer implements MessageListener {

  BlockingQueue< byte[] > queue = new LinkedBlockingQueue< byte[] >();

  // This implements the MessageListener interface:
  @Override
  public void newMessage( byte[] message ) {
    try {
      queue.put( message );
    } catch (InterruptedException e) {
        // won't happen.
    }
  }

    // Execute in another thread:       
    protected void handleMessages() throws InterruptedException {
        while ( true ) {
            byte[] newMessage = queue.take();

            // handle the new message.
        }
    }


}

You should definitely use a queue instead of the Vector!
Give every thread its own queue and, when a new message is received, add() the new message to every thread's queue. For flexibility, a listener pattern may be useful, too.

Edit:

Ok, I feel I should add an example, too:
(Classical observer pattern)

This is the interface, all consumers must implement:

public interface MessageListener {
  public void newMessage( byte[] message );
}

A producer might look like this:

public class Producer {
  Collection<MessageListener> listeners = new ArrayList<MessageListener>();


  // Allow interested parties to register for new messages
  public void addListener( MessageListener listener ) {
    this.listeners.add( listener );
  }

  public void removeListener( Object listener ) {
    this.listeners.remove( listener );
  }

  protected void produceMessages() {
    byte[] msg = new byte[10];

    // Create message and put into msg

    // Tell all registered listeners about the new message:
    for ( MessageListener l : this.listeners ) {
      l.newMessage( msg );
    }

  }
}

And a consumer class could be (using a blocking queue which does all that wait()ing and notify()ing for us):

public class Consumer implements MessageListener {

  BlockingQueue< byte[] > queue = new LinkedBlockingQueue< byte[] >();

  // This implements the MessageListener interface:
  @Override
  public void newMessage( byte[] message ) {
    try {
      queue.put( message );
    } catch (InterruptedException e) {
        // won't happen.
    }
  }

    // Execute in another thread:       
    protected void handleMessages() throws InterruptedException {
        while ( true ) {
            byte[] newMessage = queue.take();

            // handle the new message.
        }
    }


}
新人笑 2024-12-19 02:02:02

这是我在深入研究一些代码并修改一些现有示例时想到的示例。

package test.messaging;

import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;

public class TestProducerConsumers {

    static Broker broker;

    public TestProducerConsumers(int maxSize) {
        broker = new Broker(maxSize);
        Producer p = new Producer();
        Consumer c1 = new Consumer("One");
        broker.consumers.add(c1);
        c1.start();

        Consumer c2 = new Consumer("Two");
        broker.consumers.add(c2);
        c2.start();

        p.start();
    }

    // Test Producer, use your own message producer on a thread to call up
    // broker.insert() possibly passing it the message instead.
    class Producer extends Thread {

        @Override
        public void run() {
            while (true) {
                try {
                    broker.insert();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer extends Thread {
        String myName;
        LinkedBlockingQueue<String> queue;

        Consumer(String m) {
            this.myName = m;
            queue = new LinkedBlockingQueue<String>();
        }

        @Override
        public void run() {
            while(!Thread.interrupted()) {
                try {
                    while (queue.size() == 0 && !Thread.interrupted()) {
                        ;
                    }
                    while (queue.peek() == null && !Thread.interrupted()) {
                        ;
                    }
                    System.out.println("" + myName + " Consumer: " + queue.poll());
                } catch (Exception e) { }
            }
        }
    }

    class Broker {
        public ArrayList<Consumer> consumers = new ArrayList<Consumer>();

        int n;
        int maxSize;

        public Broker(int maxSize) {
            n = 0;
            this.maxSize = maxSize;
        }

        synchronized void insert() throws InterruptedException {
                    // only here for testing don't want it to runaway and 
                    //memory leak, only testing first 100 samples.
            if (n == maxSize)
                wait();
            System.out.println("Producer: " + n++);
            for (Consumer c : consumers) {
                c.queue.add("Message " + n);
            }
        }

    }

    public static void main(String[] args) {
        TestProducerConsumers pc = new TestProducerConsumers(100);

    }
}

This is what I came up with as an example when digging through some code and modifiying some existing examples.

package test.messaging;

import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;

public class TestProducerConsumers {

    static Broker broker;

    public TestProducerConsumers(int maxSize) {
        broker = new Broker(maxSize);
        Producer p = new Producer();
        Consumer c1 = new Consumer("One");
        broker.consumers.add(c1);
        c1.start();

        Consumer c2 = new Consumer("Two");
        broker.consumers.add(c2);
        c2.start();

        p.start();
    }

    // Test Producer, use your own message producer on a thread to call up
    // broker.insert() possibly passing it the message instead.
    class Producer extends Thread {

        @Override
        public void run() {
            while (true) {
                try {
                    broker.insert();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer extends Thread {
        String myName;
        LinkedBlockingQueue<String> queue;

        Consumer(String m) {
            this.myName = m;
            queue = new LinkedBlockingQueue<String>();
        }

        @Override
        public void run() {
            while(!Thread.interrupted()) {
                try {
                    while (queue.size() == 0 && !Thread.interrupted()) {
                        ;
                    }
                    while (queue.peek() == null && !Thread.interrupted()) {
                        ;
                    }
                    System.out.println("" + myName + " Consumer: " + queue.poll());
                } catch (Exception e) { }
            }
        }
    }

    class Broker {
        public ArrayList<Consumer> consumers = new ArrayList<Consumer>();

        int n;
        int maxSize;

        public Broker(int maxSize) {
            n = 0;
            this.maxSize = maxSize;
        }

        synchronized void insert() throws InterruptedException {
                    // only here for testing don't want it to runaway and 
                    //memory leak, only testing first 100 samples.
            if (n == maxSize)
                wait();
            System.out.println("Producer: " + n++);
            for (Consumer c : consumers) {
                c.queue.add("Message " + n);
            }
        }

    }

    public static void main(String[] args) {
        TestProducerConsumers pc = new TestProducerConsumers(100);

    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文