可复位倒计时锁存器

发布于 2024-11-18 11:42:45 字数 12366 浏览 3 评论 0原文

我需要一些直接相当于 CountDownLatch 的东西,但可以重置(保持线程安全!)。我无法使用经典的同步结构,因为它们在这种情况下根本不起作用(复杂的锁定问题)。目前,我正在创建许多 CountDownLatch 对象,每个对象都会替换前一个对象。我相信这是在 GC 的年轻代中进行的(由于对象数量巨大)。您可以看到下面使用锁存器的代码(它是 ns-3 网络模拟器接口的 java.net 模拟的一部分)。

一些想法可能是尝试 CyclicBarrier (JDK5+) 或 Phaser (JDK7)

我可以测试代码并回复任何找到此问题解决方案的人,因为我唯一一个可以将其插入正在运行的系统中看看会发生什么的人:)

/**
 *
 */
package kokunet;

import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kokuks.IConnectionSocket;
import kokuks.KKSAddress;
import kokuks.KKSSocket;
import kokuks.KKSSocketListener;

/**
 * KSelector
 * @version 1.0
 * @author Chris Dennett
 */
public class KSelector extends SelectorImpl {
    // True if this Selector has been closed
    private volatile boolean closed = false;

    // Lock for close and cleanup
    final class CloseLock {}
    private final Object closeLock = new CloseLock();

    private volatile boolean selecting = false;
    private volatile boolean wakeup = false;

    class SocketListener implements KKSSocketListener {
        protected volatile CountDownLatch latch = null;

        /**
         *
         */
        public SocketListener() {
            newLatch();
        }

        protected synchronized CountDownLatch newLatch() {
            return this.latch = new CountDownLatch(1);
        }

        protected synchronized void refreshReady(KKSSocket socket) {
            if (!selecting) return;

            synchronized (socketToChannel) {
                SelChImpl ch = socketToChannel.get(socket);
                if (ch == null) {
                    System.out.println("ks sendCB: channel not found for socket: " + socket);
                    return;
                }
                synchronized (channelToKey) {
                    SelectionKeyImpl sk = channelToKey.get(ch);
                    if (sk != null) {
                        if (handleSelect(sk)) {
                            latch.countDown();
                        }
                    }
                }
            }
        }
        @Override
        public void connectionSucceeded(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void connectionFailed(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void dataSent(KKSSocket socket, long bytesSent) {
            refreshReady(socket);
        }
        @Override
        public void sendCB(KKSSocket socket, long bytesAvailable) {
            refreshReady(socket);
        }
        @Override
        public void onRecv(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void newConnectionCreated(KKSSocket socket, KKSSocket newSocket, KKSAddress remoteaddress) {
            refreshReady(socket);
        }
        @Override
        public void normalClose(KKSSocket socket) {
            wakeup();
        }
        @Override
        public void errorClose(KKSSocket socket) {
            wakeup();
        }
    }

    protected final Map<KKSSocket, SelChImpl>        socketToChannel = new HashMap<KKSSocket, SelChImpl>();
    protected final Map<SelChImpl, SelectionKeyImpl> channelToKey    = new HashMap<SelChImpl, SelectionKeyImpl>();
    protected final SocketListener currListener = new SocketListener();
    protected Thread selectingThread = null;

    SelChImpl getChannelForSocket(KKSSocket s) {
        synchronized (socketToChannel) {
            return socketToChannel.get(s);
        }
    }

    SelectionKeyImpl getSelKeyForChannel(KKSSocket s) {
        synchronized (channelToKey) {
            return channelToKey.get(s);
        }
    }

    protected boolean markRead(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_READ);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markWrite(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_WRITE);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markAccept(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_ACCEPT);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markConnect(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_CONNECT);
            return selectedKeys.add(impl);
        }
    }

    /**
     * @param provider
     */
    protected KSelector(SelectorProvider provider) {
        super(provider);
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implClose()
     */
    @Override
    protected void implClose() throws IOException {
        provider().getApp().printMessage("implClose: closed: " + closed);
        synchronized (closeLock) {
            if (closed) return;
            closed = true;
            for (SelectionKey sk : keys) {
                provider().getApp().printMessage("dereg1");
                deregister((AbstractSelectionKey)sk);
                provider().getApp().printMessage("dereg2");
                SelectableChannel selch = sk.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
            implCloseInterrupt();
        }
    }

    protected void implCloseInterrupt() {
        wakeup();
    }

    private boolean handleSelect(SelectionKey k) {
        synchronized (k) {
            boolean notify = false;

            if (!k.isValid()) {
                k.cancel();
                ((SelectionKeyImpl)k).channel.socket().removeListener(currListener);
                return false;
            }

            SelectionKeyImpl ski = (SelectionKeyImpl)k;

            if ((ski.interestOps() & SelectionKeyImpl.OP_READ) != 0) {
                if (ski.channel.socket().getRxAvailable() > 0) {
                    notify |= markRead(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_WRITE) != 0) {
                if (ski.channel.socket().getTxAvailable() > 0) {
                    notify |= markWrite(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_CONNECT) != 0) {
                if (!ski.channel.socket().isConnectionless()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (!ski.channel.socket().isAccepting() && !cs.isConnecting() && !cs.isConnected()) {
                        notify |= markConnect(ski);
                    }
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_ACCEPT) != 0) {
                //provider().getApp().printMessage("accept check: ski: " + ski + ", connectionless: " + ski.channel.socket().isConnectionless() + ", listening: " + ski.channel.socket().isListening() + ", hasPendingConn: " + (ski.channel.socket().isConnectionless() ? "nope!" : ((IConnectionSocket)ski.channel.socket()).hasPendingConnections()));
                if (!ski.channel.socket().isConnectionless() && ski.channel.socket().isListening()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (cs.hasPendingConnections()) {
                        notify |= markAccept(ski);
                    }
                }
            }
            return notify;
        }
    }

    private boolean handleSelect() {
        boolean notify = false;

        // get initial status
        for (SelectionKey k : keys) {
            notify |= handleSelect(k);
        }

        return notify;
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#doSelect(long)
     */
    @Override
    protected int doSelect(long timeout) throws IOException {
        processDeregisterQueue();

        long timestartedms = System.currentTimeMillis();

        synchronized (selectedKeys) {
            synchronized (currListener) {
                wakeup = false;
                selectingThread = Thread.currentThread();
                selecting = true;
            }
            try {
                handleSelect();

                if (!selectedKeys.isEmpty() || timeout == 0) {
                    return selectedKeys.size();
                }

                //TODO: useless op if we have keys available
                for (SelectionKey key : keys) {
                    ((SelectionKeyImpl)key).channel.socket().addListener(currListener);
                }
                try {
                    while (!wakeup && isOpen() && selectedKeys.isEmpty()) {
                        CountDownLatch latch = null;
                        synchronized (currListener) {
                            if (wakeup || !isOpen() || !selectedKeys.isEmpty()) {
                                break;
                            }
                            latch = currListener.newLatch();
                        }
                        try {
                            if (timeout > 0) {
                                long currtimems = System.currentTimeMillis();
                                long remainingMS = (timestartedms + timeout) - currtimems;

                                if (remainingMS > 0) {
                                    latch.await(remainingMS, TimeUnit.MILLISECONDS);
                                } else {
                                    break;
                                }
                            } else {
                                latch.await();
                            }
                        } catch (InterruptedException e) {

                        }
                    }
                    return selectedKeys.size();
                } finally {
                    for (SelectionKey key : keys) {
                        ((SelectionKeyImpl)key).channel.socket().removeListener(currListener);
                    }
                }
            } finally {
                synchronized (currListener) {
                    selecting = false;
                    selectingThread = null;
                    wakeup = false;
                }
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implRegister(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implRegister(SelectionKeyImpl ski) {
        synchronized (closeLock) {
            if (closed) throw new ClosedSelectorException();
            synchronized (channelToKey) {
                synchronized (socketToChannel) {
                    keys.add(ski);
                    socketToChannel.put(ski.channel.socket(), ski.channel);
                    channelToKey.put(ski.channel, ski);
                }
            }
        }

    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implDereg(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implDereg(SelectionKeyImpl ski) throws IOException {
        synchronized (channelToKey) {
            synchronized (socketToChannel) {
                keys.remove(ski);
                socketToChannel.remove(ski.channel.socket());
                channelToKey.remove(ski.channel);

                SelectableChannel selch = ski.channel();

                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#wakeup()
     */
    @Override
    public Selector wakeup() {
        synchronized (currListener) {
            if (selecting) {
                wakeup = true;
                selecting = false;
                selectingThread.interrupt();
                selectingThread = null;
            }
        }
        return this;
    }
}

干杯,
克里斯

I need something which is directly equivalent to CountDownLatch, but is resettable (remaining thread-safe!). I can't use classic synchronisation constructs as they simply don't work in this situation (complex locking issues). At the moment, I'm creating many CountDownLatch objects, each replacing the previous one. I believe this is doing in the young generation in the GC (due to the sheer number of objects). You can see the code which uses the latches below (it's part of the java.net mock for a ns-3 network simulator interface).

Some ideas might be to try CyclicBarrier (JDK5+) or Phaser (JDK7)

I can test code and get back to anyone that finds a solution to this problem, since I'm the only one who can insert it into the running system to see what happens :)

/**
 *
 */
package kokunet;

import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kokuks.IConnectionSocket;
import kokuks.KKSAddress;
import kokuks.KKSSocket;
import kokuks.KKSSocketListener;

/**
 * KSelector
 * @version 1.0
 * @author Chris Dennett
 */
public class KSelector extends SelectorImpl {
    // True if this Selector has been closed
    private volatile boolean closed = false;

    // Lock for close and cleanup
    final class CloseLock {}
    private final Object closeLock = new CloseLock();

    private volatile boolean selecting = false;
    private volatile boolean wakeup = false;

    class SocketListener implements KKSSocketListener {
        protected volatile CountDownLatch latch = null;

        /**
         *
         */
        public SocketListener() {
            newLatch();
        }

        protected synchronized CountDownLatch newLatch() {
            return this.latch = new CountDownLatch(1);
        }

        protected synchronized void refreshReady(KKSSocket socket) {
            if (!selecting) return;

            synchronized (socketToChannel) {
                SelChImpl ch = socketToChannel.get(socket);
                if (ch == null) {
                    System.out.println("ks sendCB: channel not found for socket: " + socket);
                    return;
                }
                synchronized (channelToKey) {
                    SelectionKeyImpl sk = channelToKey.get(ch);
                    if (sk != null) {
                        if (handleSelect(sk)) {
                            latch.countDown();
                        }
                    }
                }
            }
        }
        @Override
        public void connectionSucceeded(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void connectionFailed(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void dataSent(KKSSocket socket, long bytesSent) {
            refreshReady(socket);
        }
        @Override
        public void sendCB(KKSSocket socket, long bytesAvailable) {
            refreshReady(socket);
        }
        @Override
        public void onRecv(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void newConnectionCreated(KKSSocket socket, KKSSocket newSocket, KKSAddress remoteaddress) {
            refreshReady(socket);
        }
        @Override
        public void normalClose(KKSSocket socket) {
            wakeup();
        }
        @Override
        public void errorClose(KKSSocket socket) {
            wakeup();
        }
    }

    protected final Map<KKSSocket, SelChImpl>        socketToChannel = new HashMap<KKSSocket, SelChImpl>();
    protected final Map<SelChImpl, SelectionKeyImpl> channelToKey    = new HashMap<SelChImpl, SelectionKeyImpl>();
    protected final SocketListener currListener = new SocketListener();
    protected Thread selectingThread = null;

    SelChImpl getChannelForSocket(KKSSocket s) {
        synchronized (socketToChannel) {
            return socketToChannel.get(s);
        }
    }

    SelectionKeyImpl getSelKeyForChannel(KKSSocket s) {
        synchronized (channelToKey) {
            return channelToKey.get(s);
        }
    }

    protected boolean markRead(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_READ);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markWrite(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_WRITE);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markAccept(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_ACCEPT);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markConnect(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_CONNECT);
            return selectedKeys.add(impl);
        }
    }

    /**
     * @param provider
     */
    protected KSelector(SelectorProvider provider) {
        super(provider);
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implClose()
     */
    @Override
    protected void implClose() throws IOException {
        provider().getApp().printMessage("implClose: closed: " + closed);
        synchronized (closeLock) {
            if (closed) return;
            closed = true;
            for (SelectionKey sk : keys) {
                provider().getApp().printMessage("dereg1");
                deregister((AbstractSelectionKey)sk);
                provider().getApp().printMessage("dereg2");
                SelectableChannel selch = sk.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
            implCloseInterrupt();
        }
    }

    protected void implCloseInterrupt() {
        wakeup();
    }

    private boolean handleSelect(SelectionKey k) {
        synchronized (k) {
            boolean notify = false;

            if (!k.isValid()) {
                k.cancel();
                ((SelectionKeyImpl)k).channel.socket().removeListener(currListener);
                return false;
            }

            SelectionKeyImpl ski = (SelectionKeyImpl)k;

            if ((ski.interestOps() & SelectionKeyImpl.OP_READ) != 0) {
                if (ski.channel.socket().getRxAvailable() > 0) {
                    notify |= markRead(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_WRITE) != 0) {
                if (ski.channel.socket().getTxAvailable() > 0) {
                    notify |= markWrite(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_CONNECT) != 0) {
                if (!ski.channel.socket().isConnectionless()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (!ski.channel.socket().isAccepting() && !cs.isConnecting() && !cs.isConnected()) {
                        notify |= markConnect(ski);
                    }
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_ACCEPT) != 0) {
                //provider().getApp().printMessage("accept check: ski: " + ski + ", connectionless: " + ski.channel.socket().isConnectionless() + ", listening: " + ski.channel.socket().isListening() + ", hasPendingConn: " + (ski.channel.socket().isConnectionless() ? "nope!" : ((IConnectionSocket)ski.channel.socket()).hasPendingConnections()));
                if (!ski.channel.socket().isConnectionless() && ski.channel.socket().isListening()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (cs.hasPendingConnections()) {
                        notify |= markAccept(ski);
                    }
                }
            }
            return notify;
        }
    }

    private boolean handleSelect() {
        boolean notify = false;

        // get initial status
        for (SelectionKey k : keys) {
            notify |= handleSelect(k);
        }

        return notify;
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#doSelect(long)
     */
    @Override
    protected int doSelect(long timeout) throws IOException {
        processDeregisterQueue();

        long timestartedms = System.currentTimeMillis();

        synchronized (selectedKeys) {
            synchronized (currListener) {
                wakeup = false;
                selectingThread = Thread.currentThread();
                selecting = true;
            }
            try {
                handleSelect();

                if (!selectedKeys.isEmpty() || timeout == 0) {
                    return selectedKeys.size();
                }

                //TODO: useless op if we have keys available
                for (SelectionKey key : keys) {
                    ((SelectionKeyImpl)key).channel.socket().addListener(currListener);
                }
                try {
                    while (!wakeup && isOpen() && selectedKeys.isEmpty()) {
                        CountDownLatch latch = null;
                        synchronized (currListener) {
                            if (wakeup || !isOpen() || !selectedKeys.isEmpty()) {
                                break;
                            }
                            latch = currListener.newLatch();
                        }
                        try {
                            if (timeout > 0) {
                                long currtimems = System.currentTimeMillis();
                                long remainingMS = (timestartedms + timeout) - currtimems;

                                if (remainingMS > 0) {
                                    latch.await(remainingMS, TimeUnit.MILLISECONDS);
                                } else {
                                    break;
                                }
                            } else {
                                latch.await();
                            }
                        } catch (InterruptedException e) {

                        }
                    }
                    return selectedKeys.size();
                } finally {
                    for (SelectionKey key : keys) {
                        ((SelectionKeyImpl)key).channel.socket().removeListener(currListener);
                    }
                }
            } finally {
                synchronized (currListener) {
                    selecting = false;
                    selectingThread = null;
                    wakeup = false;
                }
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implRegister(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implRegister(SelectionKeyImpl ski) {
        synchronized (closeLock) {
            if (closed) throw new ClosedSelectorException();
            synchronized (channelToKey) {
                synchronized (socketToChannel) {
                    keys.add(ski);
                    socketToChannel.put(ski.channel.socket(), ski.channel);
                    channelToKey.put(ski.channel, ski);
                }
            }
        }

    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implDereg(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implDereg(SelectionKeyImpl ski) throws IOException {
        synchronized (channelToKey) {
            synchronized (socketToChannel) {
                keys.remove(ski);
                socketToChannel.remove(ski.channel.socket());
                channelToKey.remove(ski.channel);

                SelectableChannel selch = ski.channel();

                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#wakeup()
     */
    @Override
    public Selector wakeup() {
        synchronized (currListener) {
            if (selecting) {
                wakeup = true;
                selecting = false;
                selectingThread.interrupt();
                selectingThread = null;
            }
        }
        return this;
    }
}

Cheers,
Chris

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(9

々眼睛长脚气 2024-11-25 11:42:45

我复制了 CountDownLatch 并实现了一个 reset() 方法,该方法将内部 Sync 类重置为其初始状态(开始计数):) 似乎可以工作美好的。不再有不必​​要的对象创建 \o/ 无法子类化,因为 sync 是私有的。嘘。

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 *
 * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
 * The {@link #await await} methods block until the current count reaches
 * zero due to invocations of the {@link #countDown} method, after which
 * all waiting threads are released and any subsequent invocations of
 * {@link #await await} return immediately.  This is a one-shot phenomenon
 * -- the count cannot be reset.  If you need a version that resets the
 * count, consider using a {@link CyclicBarrier}.
 *
 * <p>A {@code CountDownLatch} is a versatile synchronization tool
 * and can be used for a number of purposes.  A
 * {@code CountDownLatch} initialized with a count of one serves as a
 * simple on/off latch, or gate: all threads invoking {@link #await await}
 * wait at the gate until it is opened by a thread invoking {@link
 * #countDown}.  A {@code CountDownLatch} initialized to <em>N</em>
 * can be used to make one thread wait until <em>N</em> threads have
 * completed some action, or some action has been completed N times.
 *
 * <p>A useful property of a {@code CountDownLatch} is that it
 * doesn't require that threads calling {@code countDown} wait for
 * the count to reach zero before proceeding, it simply prevents any
 * thread from proceeding past an {@link #await await} until all
 * threads could pass.
 *
 * <p><b>Sample usage:</b> Here is a pair of classes in which a group
 * of worker threads use two countdown latches:
 * <ul>
 * <li>The first is a start signal that prevents any worker from proceeding
 * until the driver is ready for them to proceed;
 * <li>The second is a completion signal that allows the driver to wait
 * until all workers have completed.
 * </ul>
 *
 * <pre>
 * class Driver { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch startSignal = new CountDownLatch(1);
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       new Thread(new Worker(startSignal, doneSignal)).start();
 *
 *     doSomethingElse();            // don't let run yet
 *     startSignal.countDown();      // let all threads proceed
 *     doSomethingElse();
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class Worker implements Runnable {
 *   private final CountDownLatch startSignal;
 *   private final CountDownLatch doneSignal;
 *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
 *      this.startSignal = startSignal;
 *      this.doneSignal = doneSignal;
 *   }
 *   public void run() {
 *      try {
 *        startSignal.await();
 *        doWork();
 *        doneSignal.countDown();
 *      } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }
 *
 * </pre>
 *
 * <p>Another typical usage would be to divide a problem into N parts,
 * describe each part with a Runnable that executes that portion and
 * counts down on the latch, and queue all the Runnables to an
 * Executor.  When all sub-parts are complete, the coordinating thread
 * will be able to pass through await. (When threads must repeatedly
 * count down in this way, instead use a {@link CyclicBarrier}.)
 *
 * <pre>
 * class Driver2 { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *     Executor e = ...
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       e.execute(new WorkerRunnable(doneSignal, i));
 *
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class WorkerRunnable implements Runnable {
 *   private final CountDownLatch doneSignal;
 *   private final int i;
 *   WorkerRunnable(CountDownLatch doneSignal, int i) {
 *      this.doneSignal = doneSignal;
 *      this.i = i;
 *   }
 *   public void run() {
 *      try {
 *        doWork(i);
 *        doneSignal.countDown();
 *      } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }
 *
 * </pre>
 *
 * <p>Memory consistency effects: Actions in a thread prior to calling
 * {@code countDown()}
 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * actions following a successful return from a corresponding
 * {@code await()} in another thread.
 *
 * @since 1.5
 * @author Doug Lea
 */
public class ResettableCountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        public final int startCount;

        Sync(int count) {
            this.startCount = count;
            setState(startCount);
        }

        int getCount() {
            return getState();
        }

        public int tryAcquireShared(int acquires) {
            return getState() == 0? 1 : -1;
        }

        public boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

        public void reset() {
             setState(startCount);
        }
    }

    private final Sync sync;

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public ResettableCountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public void reset() {
        sync.reset();
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
     * or the specified waiting time elapses.
     *
     * <p>If the current count is zero then this method returns immediately
     * with the value {@code true}.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of three things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>The specified waiting time elapses.
     * </ul>
     *
     * <p>If the count reaches zero then the method returns with the
     * value {@code true}.
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the specified waiting time elapses then the value {@code false}
     * is returned.  If the time is less than or equal to zero, the method
     * will not wait at all.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the {@code timeout} argument
     * @return {@code true} if the count reached zero and {@code false}
     *         if the waiting time elapsed before the count reached zero
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    /**
     * Returns the current count.
     *
     * <p>This method is typically used for debugging and testing purposes.
     *
     * @return the current count
     */
    public long getCount() {
        return sync.getCount();
    }

    /**
     * Returns a string identifying this latch, as well as its state.
     * The state, in brackets, includes the String {@code "Count ="}
     * followed by the current count.
     *
     * @return a string identifying this latch, as well as its state
     */
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

I copied CountDownLatch and implemented a reset() method that resets the internal Sync class to its initial state (starting count) :) Appears to work fine. No more unnecessary object creation \o/ It was not possible to subclass because sync was private. Boo.

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 *
 * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
 * The {@link #await await} methods block until the current count reaches
 * zero due to invocations of the {@link #countDown} method, after which
 * all waiting threads are released and any subsequent invocations of
 * {@link #await await} return immediately.  This is a one-shot phenomenon
 * -- the count cannot be reset.  If you need a version that resets the
 * count, consider using a {@link CyclicBarrier}.
 *
 * <p>A {@code CountDownLatch} is a versatile synchronization tool
 * and can be used for a number of purposes.  A
 * {@code CountDownLatch} initialized with a count of one serves as a
 * simple on/off latch, or gate: all threads invoking {@link #await await}
 * wait at the gate until it is opened by a thread invoking {@link
 * #countDown}.  A {@code CountDownLatch} initialized to <em>N</em>
 * can be used to make one thread wait until <em>N</em> threads have
 * completed some action, or some action has been completed N times.
 *
 * <p>A useful property of a {@code CountDownLatch} is that it
 * doesn't require that threads calling {@code countDown} wait for
 * the count to reach zero before proceeding, it simply prevents any
 * thread from proceeding past an {@link #await await} until all
 * threads could pass.
 *
 * <p><b>Sample usage:</b> Here is a pair of classes in which a group
 * of worker threads use two countdown latches:
 * <ul>
 * <li>The first is a start signal that prevents any worker from proceeding
 * until the driver is ready for them to proceed;
 * <li>The second is a completion signal that allows the driver to wait
 * until all workers have completed.
 * </ul>
 *
 * <pre>
 * class Driver { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch startSignal = new CountDownLatch(1);
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       new Thread(new Worker(startSignal, doneSignal)).start();
 *
 *     doSomethingElse();            // don't let run yet
 *     startSignal.countDown();      // let all threads proceed
 *     doSomethingElse();
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class Worker implements Runnable {
 *   private final CountDownLatch startSignal;
 *   private final CountDownLatch doneSignal;
 *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
 *      this.startSignal = startSignal;
 *      this.doneSignal = doneSignal;
 *   }
 *   public void run() {
 *      try {
 *        startSignal.await();
 *        doWork();
 *        doneSignal.countDown();
 *      } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }
 *
 * </pre>
 *
 * <p>Another typical usage would be to divide a problem into N parts,
 * describe each part with a Runnable that executes that portion and
 * counts down on the latch, and queue all the Runnables to an
 * Executor.  When all sub-parts are complete, the coordinating thread
 * will be able to pass through await. (When threads must repeatedly
 * count down in this way, instead use a {@link CyclicBarrier}.)
 *
 * <pre>
 * class Driver2 { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *     Executor e = ...
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       e.execute(new WorkerRunnable(doneSignal, i));
 *
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class WorkerRunnable implements Runnable {
 *   private final CountDownLatch doneSignal;
 *   private final int i;
 *   WorkerRunnable(CountDownLatch doneSignal, int i) {
 *      this.doneSignal = doneSignal;
 *      this.i = i;
 *   }
 *   public void run() {
 *      try {
 *        doWork(i);
 *        doneSignal.countDown();
 *      } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }
 *
 * </pre>
 *
 * <p>Memory consistency effects: Actions in a thread prior to calling
 * {@code countDown()}
 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * actions following a successful return from a corresponding
 * {@code await()} in another thread.
 *
 * @since 1.5
 * @author Doug Lea
 */
public class ResettableCountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        public final int startCount;

        Sync(int count) {
            this.startCount = count;
            setState(startCount);
        }

        int getCount() {
            return getState();
        }

        public int tryAcquireShared(int acquires) {
            return getState() == 0? 1 : -1;
        }

        public boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

        public void reset() {
             setState(startCount);
        }
    }

    private final Sync sync;

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public ResettableCountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public void reset() {
        sync.reset();
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
     * or the specified waiting time elapses.
     *
     * <p>If the current count is zero then this method returns immediately
     * with the value {@code true}.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of three things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>The specified waiting time elapses.
     * </ul>
     *
     * <p>If the count reaches zero then the method returns with the
     * value {@code true}.
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the specified waiting time elapses then the value {@code false}
     * is returned.  If the time is less than or equal to zero, the method
     * will not wait at all.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the {@code timeout} argument
     * @return {@code true} if the count reached zero and {@code false}
     *         if the waiting time elapsed before the count reached zero
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    /**
     * Returns the current count.
     *
     * <p>This method is typically used for debugging and testing purposes.
     *
     * @return the current count
     */
    public long getCount() {
        return sync.getCount();
    }

    /**
     * Returns a string identifying this latch, as well as its state.
     * The state, in brackets, includes the String {@code "Count ="}
     * followed by the current count.
     *
     * @return a string identifying this latch, as well as its state
     */
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}
残龙傲雪 2024-11-25 11:42:45

Phaser 有更多选项,我们可以使用它来实现可重置的 countdownLatch。

请从以下站点阅读以下基本概念

https://examples.javacodegeeks.com/core-java/util/concurrent/phaser/java-util-concurrent-phaser-example/

http://netjs.blogspot.in/2016/01/phaser- in-java-concurrency.html

import java.util.concurrent.Phaser;
/**
 * Resettable countdownLatch using phaser
 */
public class PhaserExample {
    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(3); // you can use constructor hint or
                                        // register() or mixture of both
        // register self... so parties are incremented to 4 (3+1) now
        phaser.register();
        //register is one time call for all the phases.
        //means no need to register for every phase             


        int phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);
        new PhaserExample().testPhaser(phaser, 2000);
        new PhaserExample().testPhaser(phaser, 4000);
        new PhaserExample().testPhaser(phaser, 6000);

        // similar to await() in countDownLatch/CyclicBarrier
        // parties are decremented to 3 (4+1) now
        phaser.arriveAndAwaitAdvance(); 
        // once all the thread arrived at same level, barrier opens
        System.out.println("Barrier has broken.");
        phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);

        //second phase
        new PhaserExample().testPhaser(phaser, 2000);
        new PhaserExample().testPhaser(phaser, 4000);
        new PhaserExample().testPhaser(phaser, 6000);
        phaser.arriveAndAwaitAdvance(); 
        // once all the thread arrived at same level, barrier opens
        System.out.println("Barrier has broken.");
        phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);

    }

    private void testPhaser(final Phaser phaser, final int sleepTime) {
        // phaser.register(); //Already constructor hint is given so not
        // required
        new Thread() {
            @Override
            public void run() {
                try {
                    Thread.sleep(sleepTime);
                    System.out.println(Thread.currentThread().getName() + " arrived");
                    // phaser.arrive(); //similar to CountDownLatch#countDown()
                    phaser.arriveAndAwaitAdvance();// thread will wait till Barrier opens
                    // arriveAndAwaitAdvance is similar to CyclicBarrier#await()
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " after passing barrier");
            }
        }.start();
    }
}

Phaser has more options, we can implement resettable countdownLatch using that.

Please read below basic concepts from the following sites

https://examples.javacodegeeks.com/core-java/util/concurrent/phaser/java-util-concurrent-phaser-example/

http://netjs.blogspot.in/2016/01/phaser-in-java-concurrency.html

import java.util.concurrent.Phaser;
/**
 * Resettable countdownLatch using phaser
 */
public class PhaserExample {
    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(3); // you can use constructor hint or
                                        // register() or mixture of both
        // register self... so parties are incremented to 4 (3+1) now
        phaser.register();
        //register is one time call for all the phases.
        //means no need to register for every phase             


        int phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);
        new PhaserExample().testPhaser(phaser, 2000);
        new PhaserExample().testPhaser(phaser, 4000);
        new PhaserExample().testPhaser(phaser, 6000);

        // similar to await() in countDownLatch/CyclicBarrier
        // parties are decremented to 3 (4+1) now
        phaser.arriveAndAwaitAdvance(); 
        // once all the thread arrived at same level, barrier opens
        System.out.println("Barrier has broken.");
        phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);

        //second phase
        new PhaserExample().testPhaser(phaser, 2000);
        new PhaserExample().testPhaser(phaser, 4000);
        new PhaserExample().testPhaser(phaser, 6000);
        phaser.arriveAndAwaitAdvance(); 
        // once all the thread arrived at same level, barrier opens
        System.out.println("Barrier has broken.");
        phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);

    }

    private void testPhaser(final Phaser phaser, final int sleepTime) {
        // phaser.register(); //Already constructor hint is given so not
        // required
        new Thread() {
            @Override
            public void run() {
                try {
                    Thread.sleep(sleepTime);
                    System.out.println(Thread.currentThread().getName() + " arrived");
                    // phaser.arrive(); //similar to CountDownLatch#countDown()
                    phaser.arriveAndAwaitAdvance();// thread will wait till Barrier opens
                    // arriveAndAwaitAdvance is similar to CyclicBarrier#await()
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " after passing barrier");
            }
        }.start();
    }
}
时光暖心i 2024-11-25 11:42:45

根据@Fidel -s 的回答,我对 ResettableCountDownLatch 进行了直接替换。我所做的更改

  • mLatchprivate volatile
  • mInitialCountprivate final
  • 简单 await 的返回类型() 已更改为 void。

否则,原始代码也很酷。因此,这是完整的增强代码:

public class ResettableCountDownLatch {

    private final int initialCount;
    private volatile CountDownLatch latch;

    public ResettableCountDownLatch(int  count) {
        initialCount = count;
        latch = new CountDownLatch(count);
    }

    public void reset() {
        latch = new CountDownLatch(initialCount);
    }

    public void countDown() {
        latch.countDown();
    }

    public void await() throws InterruptedException {
        latch.await();
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return latch.await(timeout, unit);
    }
}

更新

基于 @Systemplanet-s 注释,这里是 < 的更安全版本code>reset():

    // An atomic reference is required because reset() is not that atomic anymore, not even with `volatile`.
    private final AtomicReference<CountDownLatch> latchHolder = new AtomicReference<>();

    public void reset() {
        // obtaining a local reference for modifying the required latch
        final CountDownLatch oldLatch = latchHolder.getAndSet(null);
        if (oldLatch != null) {
            // checking the count each time to prevent unnecessary countdowns due to parallel countdowns
            while (0L < oldLatch.getCount()) {
                oldLatch.countDown();
            }
        }
    }

基本上,这是简单性和安全性之间的选择。也就是说,如果您愿意将责任转移给代码的客户端,那么在 reset() 中设置引用 null 就足够了。

另一方面,如果你想让这段代码的用户变得容易,那么你需要使用更多的技巧。

Based on @Fidel -s answer, I made a drop-in replacement for ResettableCountDownLatch. The changes I made

  • mLatch is private volatile
  • mInitialCount is private final
  • the return type of the simple await() has changed to void.

Otherwise, the original code is cool too. So, this is the full, enhanced code:

public class ResettableCountDownLatch {

    private final int initialCount;
    private volatile CountDownLatch latch;

    public ResettableCountDownLatch(int  count) {
        initialCount = count;
        latch = new CountDownLatch(count);
    }

    public void reset() {
        latch = new CountDownLatch(initialCount);
    }

    public void countDown() {
        latch.countDown();
    }

    public void await() throws InterruptedException {
        latch.await();
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return latch.await(timeout, unit);
    }
}

Update

Based on @Systemplanet-s comment, here is a safer version of reset():

    // An atomic reference is required because reset() is not that atomic anymore, not even with `volatile`.
    private final AtomicReference<CountDownLatch> latchHolder = new AtomicReference<>();

    public void reset() {
        // obtaining a local reference for modifying the required latch
        final CountDownLatch oldLatch = latchHolder.getAndSet(null);
        if (oldLatch != null) {
            // checking the count each time to prevent unnecessary countdowns due to parallel countdowns
            while (0L < oldLatch.getCount()) {
                oldLatch.countDown();
            }
        }
    }

Basically, it's a choice between simplicity and safety. I.e. if you are willing to move the responsibility to the client of your code, then it's enough to set the reference null in reset().

On the other hand, if you want to make it easy for the users of this code, then you need to use a little more tricks.

可是我不能没有你 2024-11-25 11:42:45

我不确定这是否存在致命缺陷,但我最近遇到了同样的问题,并通过在每次想要重置时简单地实例化一个新的 CountDownLatch 对象来解决它。像这样的东西:

Waiter:

bla();
latch.await();
//now the latch has counted down to 0
blabla();

CountDowner

foo();
latch.countDown();
//now the latch has counted down to 0
latch = new CountDownLatch(1);
Waiter.receiveReferenceToNewLatch(latch);
bar();

显然这是一个沉重的抽象,但到目前为止它对我有用,并且不需要您修改任何类定义。

I'm not sure if this is fatally flawed but I recently had the same problem and solved it by simply instantiating a new CountDownLatch object each time I wanted to reset. Something like this:

Waiter:

bla();
latch.await();
//now the latch has counted down to 0
blabla();

CountDowner

foo();
latch.countDown();
//now the latch has counted down to 0
latch = new CountDownLatch(1);
Waiter.receiveReferenceToNewLatch(latch);
bar();

Obviously this is a heavy abstraction but thus far it has worked for me and doesn't require you to tinker with any class definitions.

﹂绝世的画 2024-11-25 11:42:45

使用移相器。

如果只有一个线程应该做工作。你可以加入 AtomicBoolean 和 Phaser

 AtomicBoolean someConditionInProgress = new AtomicBoolean("false"); Phaser onConditionalPhaser = new Phaser(1);


   (some function) if (!someConditionInProgress.compareAndSet(false, true)) {
        try {
           //do something
        } finally {
            someConditionInProgress.set(false);
            //release barier
            onConditionalPhaser.arrive();
        }
    } else {
        onConditionalPhaser.awaitAdvance(onConditionalPhaser.getPhase());
    }

Use Phaser.

if only one thread should to do work. U can join AtomicBoolean and Phaser

 AtomicBoolean someConditionInProgress = new AtomicBoolean("false"); Phaser onConditionalPhaser = new Phaser(1);


   (some function) if (!someConditionInProgress.compareAndSet(false, true)) {
        try {
           //do something
        } finally {
            someConditionInProgress.set(false);
            //release barier
            onConditionalPhaser.arrive();
        }
    } else {
        onConditionalPhaser.awaitAdvance(onConditionalPhaser.getPhase());
    }
后eg是否自 2024-11-25 11:42:45

看起来您想将异步 I/O 转换为同步。使用异步 I/O 的整体思想是避免线程,但 CountDownLatch 需要使用线程。这在你的问题中是一个明显的矛盾。因此,您可以:

  • 继续使用线程并使用同步 I/O,而不是选择器等。 这会更加简单和可靠
  • 继续使用异步 I/0 并放弃 CountDownLatch, 。那么您需要一个异步库 - 查看 RxJava、Akka 或 df4j。
  • 继续开发您的项目以获得乐趣。那么你可以尝试使用java.util.Semaphore代替CountDownLatch,或者使用synchronized/wait/notify编写自己的同步类。

Looks like you want to turn asynchronous I/O to synchronous. The whole idea of using asynchronous I/O is to avoid threads, but CountDownLatch requres using threads. This is an obvious contradiction in your question. So, you can:

  • keep using threads and employ synchronous I/O instead of Selectors and the suff. This will be much more simple and reliable
  • keep using asynchronous I/0 and give up CountDownLatch. Then you need an asynchronous library - look at RxJava, Akka, or df4j.
  • continue to develop your project for fun. Then you can try to use java.util.Semaphore instead of CountDownLatch, or program your own synchronization class using synchronized/wait/notify.
箜明 2024-11-25 11:42:45
public class ResettableLatch {
private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return getState() == 0 ? 1 : -1;
    }

    public void reset(int count) {
        setState(count);
    }

    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

public ResettableLatch(int count) {
    if (count < 0)
        throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
    sync.releaseShared(1);
}

public long getCount() {
    return sync.getCount();
}

public void reset(int count) {
    sync.reset(count);
}
}

这对我有用。

public class ResettableLatch {
private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return getState() == 0 ? 1 : -1;
    }

    public void reset(int count) {
        setState(count);
    }

    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

public ResettableLatch(int count) {
    if (count < 0)
        throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
    sync.releaseShared(1);
}

public long getCount() {
    return sync.getCount();
}

public void reset(int count) {
    sync.reset(count);
}
}

This worked for me.

黒涩兲箜 2024-11-25 11:42:45

从我从OP解释和源代码中了解到的,可重置的CountDownLatch对于他要解决的问题来说并不是一个足够的概念。 CountDownLatch 本身的文档提到OP的用例是用计数1初始化的简单门

CountDownLatch 用计数 1 初始化,作为一个简单的
开/关锁存器或门:所有调用 await 的线程都在门处等待
直到它被调用 countDown 的线程打开。

,但是 CountDownLatch 实现并没有朝这个方向走得更远。

因此,我自己遇到了与 OP 类似的问题,我决定引入一个 SimpleGate 类,其具有以下属性:

  • 许可证数量为 1,这意味着它可以在 On 中关闭 状态;

  • 有一个名为Gate Keeper的专用线程,只允许关闭打开 Gate;

  • 看门权可以转让;

  • 打开门会立即允许尝试通过门的线程执行此操作(其他答案中忽略了这个非常合乎逻辑的功能);

  • 由于线程争用预计会很高,因此支持公平性作为一个选项,这可以减少线程闯入的影响。

     公共类 SimpleGate {
    
          私有静态类 Sync 扩展 AbstractQueuedSynchronizer {
    
             // 状态
             私有静态最终 int SHUT = 1;
             私有静态最终 int OPEN = 0;
    
             私人布尔公平;
    
             公共无效setFair(布尔公平){
                this.fair = 公平;
             }
    
             公共无效关闭(){
                super.setState(SHUT);
             }
    
             @覆盖
             受保护的 int tryAcquireShared(int arg) {
                if (公平 && super.hasQueuedPredecessors())
                   返回-1;
                返回 super.getState() == OPEN ? 1:-1;
             }
    
             @覆盖
             protected boolean tryReleaseShared(int arg) {
                super.setState(OPEN); 
                返回真;
             }
    
        }
    
        私有同步同步=新同步();
        私有易失性线程gateKeeper = Thread.currentThread();
    
        公共简单门(){
           这(真);
        }
    
        公共 SimpleGate(布尔关闭){
           这(关闭,假);
        }
    
        公共 SimpleGate(布尔关闭,布尔公平){
           如果(关闭)
              同步.shutOff();
           同步.setFair(公平);
        }
    
        公共无效来通过(){
           if (Thread.currentThread() == GateKeeper)
              throw new IllegalStateException("Gate Keeper 线程不应该通过门");
           同步.acquireShared(0);
        }
    
        公共无效关闭(){
           if (Thread.currentThread() != GateKeeper)
              throw new IllegalStateException("只允许关闭 Gate Keeper 线程");
            同步.shutOff();
        }
    
        公共无效openUp(){
           if (Thread.currentThread() != GateKeeper)
              throw new IllegalStateException("只允许打开一个 Gate Keeper 线程");
           同步.releaseShared(0);
        }
    
        公共无效转移所有权(线程newGateKeeper){
           this.gateKeeper = newGateKeeper;
        }
    
        // 添加可中断等待和等待指定时间, 
        //如果需要的话,这是微不足道的 
    }
    

From what I was able to understand from the OP explanation and source code, the resettable CountDownLatch is not quite adequate concept for the problem he was going to solve. The documentation of the CountDownLatch itself mentions the OP's use case as simple gate initialized with a count of one:

CountDownLatch initialized with a count of one serves as a simple
on/off latch, or gate: all threads invoking await wait at the gate
until it is opened by a thread invoking countDown.

, but CountDownLatch implementation does not go any further in this direction.

So, myself having a problem similar to that of OP's I decided to introduce a SimpleGate class with the following properties:

  • Number of permits is one, which means it can be either in On or Off state;

  • There is a dedicated thread, called Gate Keeper that is only allowed to shut off or open up the Gate;

  • The right of gate keeping is transferable;

  • the opening up the Gate immediately allows the threads, that tried to come through the Gate, to do it (this very logical feature has been overlooked in the other answers);

  • as the thread contention is expected to be high, fairness is supported as an option, this allows to decrease an effect of thread barging.

      public class SimpleGate {
    
          private static class Sync extends AbstractQueuedSynchronizer {
    
             // State
             private static final int SHUT = 1;
             private static final int OPEN = 0;
    
             private boolean fair;
    
             public void setFair(boolean fair) {
                this.fair = fair;
             }
    
             public void shutOff() {
                super.setState(SHUT);
             }
    
             @Override
             protected int tryAcquireShared(int arg) {
                if (fair && super.hasQueuedPredecessors())
                   return -1;
                return super.getState() == OPEN ? 1 : -1;
             }
    
             @Override
             protected boolean tryReleaseShared(int arg) {
                super.setState(OPEN); 
                return true;
             }
    
        }
    
        private Sync sync = new Sync();
        private volatile Thread gateKeeper = Thread.currentThread();
    
        public SimpleGate(){
           this(true);
        }
    
        public SimpleGate(boolean shutOff){
           this(shutOff, false);
        }
    
        public SimpleGate(boolean shutOff, boolean fair){
           if (shutOff)
              sync.shutOff();
           sync.setFair(fair);
        }
    
        public void comeThrough(){
           if (Thread.currentThread() == gateKeeper)
              throw new IllegalStateException("Gate Keeper thread is not supposed to come through the gate");
           sync.acquireShared(0);
        }
    
        public void shutOff(){
           if (Thread.currentThread() != gateKeeper)
              throw new IllegalStateException("Only a Gate Keeper thread is allowed to shut off");
            sync.shutOff();
        }
    
        public void openUp(){
           if (Thread.currentThread() != gateKeeper)
              throw new IllegalStateException("Only a Gate Keeper thread is allowed to open up");
           sync.releaseShared(0);
        }
    
        public void transferOwnership(Thread newGateKeeper){
           this.gateKeeper  = newGateKeeper;
        }
    
        // an addition of waiting interruptibly and waiting for specified amount of time, 
        //if they are needed, is trivial 
    }
    
薔薇婲 2024-11-25 11:42:45

另一个替代品

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class ResettableCountDownLatch {
    int mInitialCount;
    CountDownLatch mLatch;

    public ResettableCountDownLatch(int  count) {
        mInitialCount = count;
        mLatch = new CountDownLatch(count);
    }

    public void reset() {
        mLatch = new CountDownLatch(mInitialCount);
    }

    public void countDown() {
        mLatch.countDown();
    }

    public boolean await() throws InterruptedException {
        boolean result = mLatch.await();
        return result;
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        boolean result = mLatch.await(timeout, unit);
        return result;
    }
}

Another drop-in replacement

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class ResettableCountDownLatch {
    int mInitialCount;
    CountDownLatch mLatch;

    public ResettableCountDownLatch(int  count) {
        mInitialCount = count;
        mLatch = new CountDownLatch(count);
    }

    public void reset() {
        mLatch = new CountDownLatch(mInitialCount);
    }

    public void countDown() {
        mLatch.countDown();
    }

    public boolean await() throws InterruptedException {
        boolean result = mLatch.await();
        return result;
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        boolean result = mLatch.await(timeout, unit);
        return result;
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文