当所有挂起的消息发送完毕后关闭套接字
我有这种方法将消息写入套接字:
public void sendMessage(byte[] msgB) {
try {
synchronized (writeLock) {
log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB));
ous.write(HEADER_MSG);
ous.writeInt(msgB.length);
ous.write(msgB);
ous.flush();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
现在一个名为 Bob 的线程想要在某个不确定的时刻 X 关闭套接字,这意味着可能仍然有线程等待 writeLock 发送消息,甚至可能有一个线程正在写入消息。
我可以通过让 Bob 在关闭套接字之前获取 writeLock
来解决后者,但我仍然可能会丢失尚未开始发送的消息,因为据我所知 synchronized
是不公平的,Bob 可以在其他等待更长时间的线程之前获得锁。
我需要的是在 X 之前对 sendMessage
进行的所有调用都能正常完成其工作,而在 X 之后进行的调用会抛出错误。我该怎么做?
- 具体说明:Bob 是从套接字输入流读取的线程,X 是在该流上收到“关闭”消息的时间。
I have this method to write messages to a socket:
public void sendMessage(byte[] msgB) {
try {
synchronized (writeLock) {
log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB));
ous.write(HEADER_MSG);
ous.writeInt(msgB.length);
ous.write(msgB);
ous.flush();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
Now a thread called Bob would like to close the socket at some undeterministic moment X, which means that there may still be threads waiting on writeLock
to send their message, and there may even be one thread in the middle of writing it.
I can solve the latter by letting Bob acquire writeLock
before closing the socket, but I could still lose messages that have not yet begun sending, because as far as I know synchronized
is not fair, Bob could get the lock before some other thread that has been waiting longer.
What I need is that all calls made to sendMessage
before X do their job normally, and calls made after X throw an error. How can I do this?
- Specifics: Bob is the thread reading from the socket's input stream and X is when a "close" message is received on that stream.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
您可以在此处使用执行者。由于每个发送消息都是同步的(我假设在一个公共共享对象上),因此您可以使用线程限制。
完成后,另一个线程可以调用 atMomentX();
从 javadoc 中,关闭方法说:
You can use an executor here. Since each send message is synchronized (I am assuming on a commonly shared object) you can use thread confinement.
When finished another thread can invoke atMomentX();
From the javadoc the shutdown method says:
考虑使用单线程
ExecutorService< /code>
执行消息写入。发送线程只是尝试通过调用execute(Runnable) 或submit(Callable) 来“发送”它们的消息。一旦您希望停止发送消息,请关闭
ExecutorService
(shutdown()
) 导致后续调用提交/执行导致RejectedExecutionException
。这种方法的优点是,与有多个线程等待写入消息本身相比,您只有一个 I/O 绑定线程,并且锁争用更少。这也是更好的关注点分离。
下面是一个简单的例子,它使问题更加面向对象:
Consider using a single-threaded
ExecutorService
to perform writing of messages. Sending threads simply attempt to "send" their message by callingexecute(Runnable)
orsubmit(Callable)
. Once you wish to stop sending messages you shut-down theExecutorService
(shutdown()
) causing subsequent calls to submit / execute to result in aRejectedExecutionException
.The advantage of this approach is that you only have one I/O bound thread and less lock-contention than if you have multiple threads waiting to write messages themselves. It is also a better separation of concerns.
Here's a quick example that OO-ifies the problem a bit more:
我想我可以用 ReentrantLock 设置为公平的。
I suppose I could replace the synchronized block with a ReentrantLock set to be fair.