我正在编写一个模仿电影院的多线程应用程序。每个参与的人都是自己的线程,并发必须完全由信号量来完成。我遇到的唯一问题是如何基本上链接线程以便它们可以通信(例如通过管道)。
例如:
Customer[1] 是一个线程,它获取一个信号量,让它走到票房。现在,客户[1] 必须告诉票房代理他们想看电影“X”。然后 BoxOfficeAgent[1] 也是一个线程,必须检查以确保电影未满,然后出售门票或告诉 Customer[1] 选择另一部电影。
如何来回传递数据,同时仍保持与信号量的并发性?
另外,我可以从 java.util.concurrent 使用的唯一类是 信号量 类。
I am writing a multi-threaded application that mimics a movie theater. Each person involved is its own thread and concurrency must be done completely by semaphores. The only issue I am having is how to basically link threads so that they can communicate (via a pipe for instance).
For instance:
Customer[1] which is a thread, acquires a semaphore that lets it walk up to the Box Office. Now Customer[1] must tell the Box Office Agent that they want to see movie "X". Then BoxOfficeAgent[1] also a thread, must check to make sure the movie isn't full and either sell a ticket or tell Customer[1] to pick another movie.
How do I pass that data back and forth while still maintaining concurrency with the semaphores?
Also, the only class I can use from java.util.concurrent is the Semaphore class.
发布评论
评论(2)
在线程之间来回传递数据的一种简单方法是使用接口
BlockingQueue
,位于包java.util.concurrent
。该接口具有以不同行为向集合添加元素的方法:
add(E)
:如果可能,则添加,否则抛出异常boolean Offer(E)
:如果元素已添加,否则为 falseboolean Offer(E, long, TimeUnit)
:尝试添加元素,等待指定的时间put(E)
:阻止调用线程直到添加元素为止它还定义了具有类似行为的元素检索方法:
take()
:阻塞直到有可用元素poll(long, TimeUnit)
:检索元素或返回 null我最常用的实现是:
ArrayBlockingQueue
,LinkedBlockingQueue
和SynchronousQueue
。第一个 ArrayBlockingQueue 具有固定大小,由传递给其构造函数的参数定义。
第二个
LinkedBlockingQueue
的大小不受限制。它总是接受任何元素,即offer
会立即返回 true,add
永远不会抛出异常。第三个,对我来说最有趣的一个,
SynchronousQueue
,就是一个管道。您可以将其视为大小为 0 的队列。它永远不会保留元素:只有当有其他线程尝试从中检索元素时,该队列才会接受元素。相反,如果有另一个线程尝试推送某个元素,则检索操作只会返回该元素。为了满足仅使用信号量完成同步的家庭作业要求,您可以从我给您提供的有关 SynchronousQueue 的描述中获得启发,并编写一些非常相似的内容:
请注意,此类呈现与我描述的 SynchronousQueue 类似的行为。
一旦调用方法
put(E)
,它就会获取写入信号量,该信号量将保留为空,以便对同一方法的另一个调用将在其第一行阻塞。然后,此方法存储对正在传递的对象的引用,并释放读取的信号量。此版本将使任何调用take()
方法的线程都可以继续执行。take() 方法的第一步自然是获取读取信号量,以禁止任何其他线程同时检索该元素。检索元素并将其保存在局部变量中后(练习:如果删除该行 E e = this.e 会发生什么?),该方法释放写入信号量,以便put(E) 方法可以由任何线程再次调用,并返回局部变量中保存的内容。
作为一个重要的注释,请注意对正在传递的对象的引用保存在私有字段中,并且方法
take()
和put(E)< /code> 都是最终。这是极其重要的,但也常常被忽视。如果这些方法不是最终方法(或更糟糕的是,该字段不是私有的),继承类将能够改变
take()
和put(E)
的行为合同。最后,您可以通过使用
try {} finally {}
来避免在take()
方法中声明局部变量,如下所示:这里是本示例的要点如果只是为了展示没有经验的开发人员不会注意到的
try/finally
的使用。显然,在这种情况下,没有真正的收益。哦该死,我已经帮你完成大部分作业了。为了报应——并且为了测试你对信号量的了解——,为什么不实现 BlockingQueue 合约定义的一些其他方法呢?例如,您可以实现一个
offer(E)
方法和一个take(E, long, TimeUnit)
!祝你好运。
One easy way to pass data back and forth between threads is to use the implementations of the interface
BlockingQueue<E>
, located in the packagejava.util.concurrent
.This interfaces has methods to add elements to the collection with different behaviors:
add(E)
: adds if possible, otherwise throws exceptionboolean offer(E)
: returns true if the element has been added, false otherwiseboolean offer(E, long, TimeUnit)
: tries to add the element, waiting the specified amount of timeput(E)
: blocks the calling thread until the element has been addedIt also defines methods for element retrieval with similar behaviors:
take()
: blocks until there's an element availablepoll(long, TimeUnit)
: retrieves an element or returns nullThe implementations I use most frequently are:
ArrayBlockingQueue
,LinkedBlockingQueue
andSynchronousQueue
.The first one,
ArrayBlockingQueue
, has a fixed size, defined by a parameter passed to its constructor.The second,
LinkedBlockingQueue
, has illimited size. It will always accept any elements, that is,offer
will return true immediately,add
will never throw an exception.The third, and to me the most interesting one,
SynchronousQueue
, is exactly a pipe. You can think of it as a queue with size 0. It will never keep an element: this queue will only accept elements if there's some other thread trying to retrieve elements from it. Conversely, a retrieval operation will only return an element if there's another thread trying to push it.To fulfill the homework requirement of synchronization done exclusively with semaphores, you could get inspired by the description I gave you about the SynchronousQueue, and write something quite similar:
Notice that this class presents similar behavior to what I described about the SynchronousQueue.
Once the methods
put(E)
gets called it acquires the write semaphore, which will be left empty, so that another call to the same method would block at its first line. This method then stores a reference to the object being passed, and releases the read semaphore. This release will make it possible for any thread calling thetake()
method to proceed.The first step of the
take()
method is then, naturally, to acquire the read semaphore, in order to disallow any other thread to retrieve the element concurrently. After the element has been retrieved and kept in a local variable (exercise: what would happen if that line, E e = this.e, were removed?), the method releases the write semaphore, so that the methodput(E)
may be called again by any thread, and returns what has been saved in the local variable.As an important remark, observe that the reference to the object being passed is kept in a private field, and the methods
take()
andput(E)
are both final. This is of utmost importance, and often missed. If these methods were not final (or worse, the field not private), an inheriting class would be able to alter the behavior oftake()
andput(E)
breaking the contract.Finally, you could avoid the need to declare a local variable in the
take()
method by usingtry {} finally {}
as follows:Here, the point of this example if just to show an use of
try/finally
that goes unnoticed among inexperienced developers. Obviously, in this case, there's no real gain.Oh damn, I've mostly finished your homework for you. In retribution -- and for you to test your knowledge about Semaphores --, why don't you implement some of the other methods defined by the BlockingQueue contract? For example, you could implement an
offer(E)
method and atake(E, long, TimeUnit)
!Good luck.
将其视为带有读/写锁的共享内存。
问候
PKV
Think it in terms of shared memory with read/write lock.
Regards
PKV