使用 Java 在线程之间传输数据

发布于 2024-10-31 16:50:12 字数 450 浏览 0 评论 0 原文

我正在编写一个模仿电影院的多线程应用程序。每个参与的人都是自己的线程,并发必须完全由信号量来完成。我遇到的唯一问题是如何基本上链接线程以便它们可以通信(例如通过管道)。

例如:

Customer[1] 是一个线程,它获取一个信号量,让它走到票房。现在,客户[1] 必须告诉票房代理他们想看电影“X”。然后 BoxOfficeAgent[1] 也是一个线程,必须检查以确保电影未满,然后出售门票或告诉 Customer[1] 选择另一部电影。

如何来回传递数据,同时仍保持与信号量的并发性?

另外,我可以从 java.util.concurrent 使用的唯一类是 信号量 类。

I am writing a multi-threaded application that mimics a movie theater. Each person involved is its own thread and concurrency must be done completely by semaphores. The only issue I am having is how to basically link threads so that they can communicate (via a pipe for instance).

For instance:

Customer[1] which is a thread, acquires a semaphore that lets it walk up to the Box Office. Now Customer[1] must tell the Box Office Agent that they want to see movie "X". Then BoxOfficeAgent[1] also a thread, must check to make sure the movie isn't full and either sell a ticket or tell Customer[1] to pick another movie.

How do I pass that data back and forth while still maintaining concurrency with the semaphores?

Also, the only class I can use from java.util.concurrent is the Semaphore class.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

回心转意 2024-11-07 16:50:12

在线程之间来回传递数据的一种简单方法是使用接口 BlockingQueue,位于包java.util.concurrent

该接口具有以不同行为向集合添加元素的方法:

  • add(E):如果可能,则添加,否则抛出异常
  • boolean Offer(E):如果元素已添加,否则为 false
  • boolean Offer(E, long, TimeUnit):尝试添加元素,等待指定的时间
  • put(E):阻止调用线程直到添加元素

为止它还定义了具有类似行为的元素检索方法:

  • take():阻塞直到有可用元素
  • poll(long, TimeUnit):检索元素或返回 null

我最常用的实现是: ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue

第一个 ArrayBlockingQueue 具有固定大小,由传递给其构造函数的参数定义。

第二个 LinkedBlockingQueue 的大小不受限制。它总是接受任何元素,即 offer 会立即返回 true,add 永远不会抛出异常。

第三个,对我来说最有趣的一个,SynchronousQueue,就是一个管道。您可以将其视为大小为 0 的队列。它永远不会保留元素:只有当有其他线程尝试从中检索元素时,该队列才会接受元素。相反,如果有另一个线程尝试推送某个元素,则检索操作只会返回该元素。

为了满足仅使用信号量完成同步家庭作业要求,您可以从我给您提供的有关 SynchronousQueue 的描述中获得启发,并编写一些非常相似的内容:

class Pipe<E> {
  private E e;

  private final Semaphore read = new Semaphore(0);
  private final Semaphore write = new Semaphore(1);

  public final void put(final E e) {
    write.acquire();
    this.e = e;
    read.release();
  }

  public final E take() {
    read.acquire();
    E e = this.e;
    write.release();
    return e;
  }
}

请注意,此类呈现与我描述的 SynchronousQueue 类似的行为。

一旦调用方法 put(E) ,它就会获取写入信号量,该信号量将保留为空,以便对同一方法的另一个调用将在其第一行阻塞。然后,此方法存储对正在传递的对象的引用,并释放读取的信号量。此版本将使任何调用 take() 方法的线程都可以继续执行。

take() 方法的第一步自然是获取读取信号量,以禁止任何其他线程同时检索该元素。检索元素并将其保存在局部变量中后(练习:如果删除该行 E e = this.e 会发生什么?),该方法释放写入信号量,以便put(E) 方法可以由任何线程再次调用,并返回局部变量中保存的内容。

作为一个重要的注释,请注意对正在传递的对象的引用保存在私有字段中,并且方法take()put(E)< /code> 都是最终。这是极其重要的,但也常常被忽视。如果这些方法不是最终方法(或更糟糕的是,该字段不是私有的),继承类将能够改变 take()put(E) 的行为合同。

最后,您可以通过使用 try {} finally {} 来避免在 take() 方法中声明局部变量,如下所示:

class Pipe<E> {
  // ...
  public final E take() {
    try {
      read.acquire();
      return e;
    } finally {
      write.release();
    }
  }
}

这里是本示例的要点如果只是为了展示没有经验的开发人员不会注意到的 try/finally 的使用。显然,在这种情况下,没有真正的收益。

哦该死,我已经帮你完成大部分作业了。为了报应——并且为了测试你对信号量的了解——,为什么不实现 BlockingQueue 合约定义的一些其他方法呢?例如,您可以实现一个 offer(E) 方法和一个 take(E, long, TimeUnit)

祝你好运。

One easy way to pass data back and forth between threads is to use the implementations of the interface BlockingQueue<E>, located in the package java.util.concurrent.

This interfaces has methods to add elements to the collection with different behaviors:

  • add(E): adds if possible, otherwise throws exception
  • boolean offer(E): returns true if the element has been added, false otherwise
  • boolean offer(E, long, TimeUnit): tries to add the element, waiting the specified amount of time
  • put(E): blocks the calling thread until the element has been added

It also defines methods for element retrieval with similar behaviors:

  • take(): blocks until there's an element available
  • poll(long, TimeUnit): retrieves an element or returns null

The implementations I use most frequently are: ArrayBlockingQueue, LinkedBlockingQueue and SynchronousQueue.

The first one, ArrayBlockingQueue, has a fixed size, defined by a parameter passed to its constructor.

The second, LinkedBlockingQueue, has illimited size. It will always accept any elements, that is, offer will return true immediately, add will never throw an exception.

The third, and to me the most interesting one, SynchronousQueue, is exactly a pipe. You can think of it as a queue with size 0. It will never keep an element: this queue will only accept elements if there's some other thread trying to retrieve elements from it. Conversely, a retrieval operation will only return an element if there's another thread trying to push it.

To fulfill the homework requirement of synchronization done exclusively with semaphores, you could get inspired by the description I gave you about the SynchronousQueue, and write something quite similar:

class Pipe<E> {
  private E e;

  private final Semaphore read = new Semaphore(0);
  private final Semaphore write = new Semaphore(1);

  public final void put(final E e) {
    write.acquire();
    this.e = e;
    read.release();
  }

  public final E take() {
    read.acquire();
    E e = this.e;
    write.release();
    return e;
  }
}

Notice that this class presents similar behavior to what I described about the SynchronousQueue.

Once the methods put(E) gets called it acquires the write semaphore, which will be left empty, so that another call to the same method would block at its first line. This method then stores a reference to the object being passed, and releases the read semaphore. This release will make it possible for any thread calling the take() method to proceed.

The first step of the take() method is then, naturally, to acquire the read semaphore, in order to disallow any other thread to retrieve the element concurrently. After the element has been retrieved and kept in a local variable (exercise: what would happen if that line, E e = this.e, were removed?), the method releases the write semaphore, so that the method put(E) may be called again by any thread, and returns what has been saved in the local variable.

As an important remark, observe that the reference to the object being passed is kept in a private field, and the methods take() and put(E) are both final. This is of utmost importance, and often missed. If these methods were not final (or worse, the field not private), an inheriting class would be able to alter the behavior of take() and put(E) breaking the contract.

Finally, you could avoid the need to declare a local variable in the take() method by using try {} finally {} as follows:

class Pipe<E> {
  // ...
  public final E take() {
    try {
      read.acquire();
      return e;
    } finally {
      write.release();
    }
  }
}

Here, the point of this example if just to show an use of try/finally that goes unnoticed among inexperienced developers. Obviously, in this case, there's no real gain.

Oh damn, I've mostly finished your homework for you. In retribution -- and for you to test your knowledge about Semaphores --, why don't you implement some of the other methods defined by the BlockingQueue contract? For example, you could implement an offer(E) method and a take(E, long, TimeUnit)!

Good luck.

野鹿林 2024-11-07 16:50:12

将其视为带有读/写锁的共享内存。

  1. 创建一个缓冲区来放置消息。
  2. 应使用锁/信号量来控制对缓冲区的访问。
  3. 使用此缓冲区进行线程间通信。

问候

PKV

Think it in terms of shared memory with read/write lock.

  1. Create a buffer to put the message.
  2. The access to the buffer should be controlled by using a lock/semaphore.
  3. Use this buffer for inter thread communication purpose.

Regards

PKV

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文