当所有挂起的消息发送完毕后关闭套接字

发布于 2024-10-01 12:55:46 字数 954 浏览 2 评论 0原文

我有这种方法将消息写入套接字:

public void sendMessage(byte[] msgB) {
    try {
        synchronized (writeLock) {
            log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB));
            ous.write(HEADER_MSG);
            ous.writeInt(msgB.length);
            ous.write(msgB);
            ous.flush();
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

现在一个名为 Bob 的线程想要在某个不确定的时刻 X 关闭套接字,这意味着可能仍然有线程等待 writeLock 发送消息,甚至可能有一个线程正在写入消息。

我可以通过让 Bob 在关闭套接字之前获取 writeLock 来解决后者,但我仍然可能会丢失尚未开始发送的消息,因为据我所知 synchronized 是不公平的,Bob 可以在其他等待更长时间的线程之前获得锁。

我需要的是在 X 之前对 sendMessage 进行的所有调用都能正常完成其工作,而在 X 之后进行的调用会抛出错误。我该怎么做?

  • 具体说明:Bob 是从套接字输入流读取的线程,X 是在该流上收到“关闭”消息的时间。

I have this method to write messages to a socket:

public void sendMessage(byte[] msgB) {
    try {
        synchronized (writeLock) {
            log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB));
            ous.write(HEADER_MSG);
            ous.writeInt(msgB.length);
            ous.write(msgB);
            ous.flush();
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

Now a thread called Bob would like to close the socket at some undeterministic moment X, which means that there may still be threads waiting on writeLock to send their message, and there may even be one thread in the middle of writing it.

I can solve the latter by letting Bob acquire writeLock before closing the socket, but I could still lose messages that have not yet begun sending, because as far as I know synchronized is not fair, Bob could get the lock before some other thread that has been waiting longer.

What I need is that all calls made to sendMessage before X do their job normally, and calls made after X throw an error. How can I do this?

  • Specifics: Bob is the thread reading from the socket's input stream and X is when a "close" message is received on that stream.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

原野 2024-10-08 12:55:46

您可以在此处使用执行者。由于每个发送消息都是同步的(我假设在一个公共共享对象上),因此您可以使用线程限制。

static final ExecutorService executor = Executors.newSingleThreadExecutor();

public void sendMessage(byte[] msgB) {
    executor.submit(new Runnable() {
        public void run() {
            try {
                log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB));
                ous.write(HEADER_MSG);
                ous.writeInt(msgB.length);
                ous.write(msgB);
                ous.flush();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    });
}
public static void atMomentX(){
    executor.shutdown();
}

完成后,另一个线程可以调用 atMomentX();

从 javadoc 中,关闭方法说:

启动有序关闭,其中
之前提交的任务有
已执行,但不会执行新任务
公认。调用没有
如果已经关闭则有额外效果
下来。

You can use an executor here. Since each send message is synchronized (I am assuming on a commonly shared object) you can use thread confinement.

static final ExecutorService executor = Executors.newSingleThreadExecutor();

public void sendMessage(byte[] msgB) {
    executor.submit(new Runnable() {
        public void run() {
            try {
                log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB));
                ous.write(HEADER_MSG);
                ous.writeInt(msgB.length);
                ous.write(msgB);
                ous.flush();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    });
}
public static void atMomentX(){
    executor.shutdown();
}

When finished another thread can invoke atMomentX();

From the javadoc the shutdown method says:

Initiates an orderly shutdown in which
previously submitted tasks are
executed, but no new tasks will be
accepted. Invocation has no
additional effect if already shut
down.

画▽骨i 2024-10-08 12:55:46

考虑使用单线程 ExecutorService< /code>执行消息写入。发送线程只是尝试通过调用execute(Runnable) 或submit(Callable) 来“发送”它们的消息。一旦您希望停止发送消息,请关闭 ExecutorService (shutdown()) 导致后续调用提交/执行导致 RejectedExecutionException

这种方法的优点是,与有多个线程等待写入消息本身相比,您只有一个 I/O 绑定线程,并且锁争用更少。这也是更好的关注点分离。

下面是一个简单的例子,它使问题更加面向对象:

public interface Message {
  /**
   * Writes the message to the specified stream.
   */
  void writeTo(OutputStream os);
}

public class Dispatcher {
  private final ExecutorService sender;
  private final OutputStream out;

  public Dispatcher() {
    this.sender = Executors.newSingleThreadExecutor();
    this.out = ...; // Set up output stream.
  }

  /**
   * Enqueue message to be sent.  Return a Future to allow calling thread
   * to perform a blocking get() if they wish to perform a synchronous send.
   */
  public Future<?> sendMessage(final Message msg) {
    return sender.submit(new Callable<Void>() {
      public Void call() throws Exception {
        msg.writeTo(out);
        return null;
      }
    });
  }

  public void shutDown() {
    sender.shutdown(); // Waits for all tasks to finish sending.

    // Close quietly, swallow exception.
    try {
      out.close();
    } catch (IOException ex) {
    }
  }
}

Consider using a single-threaded ExecutorService to perform writing of messages. Sending threads simply attempt to "send" their message by calling execute(Runnable) or submit(Callable). Once you wish to stop sending messages you shut-down the ExecutorService (shutdown()) causing subsequent calls to submit / execute to result in a RejectedExecutionException.

The advantage of this approach is that you only have one I/O bound thread and less lock-contention than if you have multiple threads waiting to write messages themselves. It is also a better separation of concerns.

Here's a quick example that OO-ifies the problem a bit more:

public interface Message {
  /**
   * Writes the message to the specified stream.
   */
  void writeTo(OutputStream os);
}

public class Dispatcher {
  private final ExecutorService sender;
  private final OutputStream out;

  public Dispatcher() {
    this.sender = Executors.newSingleThreadExecutor();
    this.out = ...; // Set up output stream.
  }

  /**
   * Enqueue message to be sent.  Return a Future to allow calling thread
   * to perform a blocking get() if they wish to perform a synchronous send.
   */
  public Future<?> sendMessage(final Message msg) {
    return sender.submit(new Callable<Void>() {
      public Void call() throws Exception {
        msg.writeTo(out);
        return null;
      }
    });
  }

  public void shutDown() {
    sender.shutdown(); // Waits for all tasks to finish sending.

    // Close quietly, swallow exception.
    try {
      out.close();
    } catch (IOException ex) {
    }
  }
}
痞味浪人 2024-10-08 12:55:46

我想我可以用 ReentrantLock 设置为公平的。

I suppose I could replace the synchronized block with a ReentrantLock set to be fair.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文