并发设置队列

发布于 2024-09-07 01:23:05 字数 138 浏览 7 评论 0原文

也许这是一个愚蠢的问题,但我似乎找不到明显的答案。

我需要一个仅包含唯一值的并发 FIFO 队列。尝试添加队列中已存在的值只会忽略该值。如果不是为了线程安全,这将是微不足道的。 Java 中是否有数据结构或者互联网上的代码片段可以表现出这种行为?

Maybe this is a silly question, but I cannot seem to find an obvious answer.

I need a concurrent FIFO queue that contains only unique values. Attempting to add a value that already exists in the queue simply ignores that value. Which, if not for the thread safety would be trivial. Is there a data structure in Java or maybe a code snipit on the interwebs that exhibits this behavior?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(7

╰ゝ天使的微笑 2024-09-14 01:23:05

如果您想要比完全同步更好的并发性,我知道有一种方法可以做到这一点,即使用 ConcurrentHashMap 作为支持映射。以下只是草图。

public final class ConcurrentHashSet<E> extends ForwardingSet<E>
    implements Set<E>, Queue<E> {
  private enum Dummy { VALUE }

  private final ConcurrentMap<E, Dummy> map;

  ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {
    super(map.keySet());
    this.map = Preconditions.checkNotNull(map);
  }

  @Override public boolean add(E element) {
    return map.put(element, Dummy.VALUE) == null;
  }

  @Override public boolean addAll(Collection<? extends E> newElements) {
    // just the standard implementation
    boolean modified = false;
    for (E element : newElements) {
      modified |= add(element);
    }
    return modified;
  }

  @Override public boolean offer(E element) {
    return add(element);
  }

  @Override public E remove() {
    E polled = poll();
    if (polled == null) {
      throw new NoSuchElementException();
    }
    return polled;
  }

  @Override public E poll() {
    for (E element : this) {
      // Not convinced that removing via iterator is viable (check this?)
      if (map.remove(element) != null) {
        return element;
      }
    }
    return null;
  }

  @Override public E element() {
    return iterator().next();
  }

  @Override public E peek() {
    Iterator<E> iterator = iterator();
    return iterator.hasNext() ? iterator.next() : null;
  }
}

这种方法并不都是阳光明媚的。除了使用后备映射的 entrySet().iterator().next() 之外,我们没有其他合适的方法来选择头元素,结果是随着时间的推移,映射变得越来越不平衡。由于更大的存储桶冲突和更大的段争用,这种不平衡是一个问题。

注意:此代码在一些地方使用了 Guava

If you want better concurrency than full synchronization, there is one way I know of to do it, using a ConcurrentHashMap as the backing map. The following is a sketch only.

public final class ConcurrentHashSet<E> extends ForwardingSet<E>
    implements Set<E>, Queue<E> {
  private enum Dummy { VALUE }

  private final ConcurrentMap<E, Dummy> map;

  ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {
    super(map.keySet());
    this.map = Preconditions.checkNotNull(map);
  }

  @Override public boolean add(E element) {
    return map.put(element, Dummy.VALUE) == null;
  }

  @Override public boolean addAll(Collection<? extends E> newElements) {
    // just the standard implementation
    boolean modified = false;
    for (E element : newElements) {
      modified |= add(element);
    }
    return modified;
  }

  @Override public boolean offer(E element) {
    return add(element);
  }

  @Override public E remove() {
    E polled = poll();
    if (polled == null) {
      throw new NoSuchElementException();
    }
    return polled;
  }

  @Override public E poll() {
    for (E element : this) {
      // Not convinced that removing via iterator is viable (check this?)
      if (map.remove(element) != null) {
        return element;
      }
    }
    return null;
  }

  @Override public E element() {
    return iterator().next();
  }

  @Override public E peek() {
    Iterator<E> iterator = iterator();
    return iterator.hasNext() ? iterator.next() : null;
  }
}

All is not sunshine with this approach. We have no decent way to select a head element other than using the backing map's entrySet().iterator().next(), the result being that the map gets more and more unbalanced as time goes on. This unbalancing is a problem both due to greater bucket collisions and greater segment contention.

Note: this code uses Guava in a few places.

任性一次 2024-09-14 01:23:05

没有内置集合可以执行此操作。有一些并发Set 实现可以与并发Queue 一起使用。

例如,一个项目只有在成功添加到集合后才会添加到队列中,而从队列中删除的每个项目都会从集合中删除。在这种情况下,从逻辑上讲,队列的内容实际上是集合中的任何内容,队列只是用于跟踪顺序并提供高效的 take()poll()仅在 BlockingQueue 上找到的 操作。

There's not a built-in collection that does this. There are some concurrent Set implementations that could be used together with a concurrent Queue.

For example, an item is added to the queue only after it was successfully added to the set, and each item removed from the queue is removed from the set. In this case, the contents of the queue, logically, are really whatever is in the set, and the queue is just used to track the order and provide efficient take() and poll() operations found only on a BlockingQueue.

鲜肉鲜肉永远不皱 2024-09-14 01:23:05

我会使用同步 LinkedHashSet,直到有足够的理由考虑替代方案。更加并发的解决方案可以提供的主要好处是锁分割。

最简单的并发方法是 ConcurrentHashMap(充当集合)和 ConcurrentLinkedQueue。操作的顺序将提供所需的约束。 Offer() 首先执行 CHM#putIfAbsent(),如果成功则插入到 CLQ 中。 poll() 将从 CLQ 中获取,然后将其从 CHM 中删除。这意味着如果队列中的某个条目位于映射中并且 CLQ 提供了排序,则我们会考虑该条目。然后可以通过增加映射的并发级别来调整性能。如果您能够容忍额外的活泼性,那么廉价的 CHM#get() 可以作为合理的前提条件(但它可能会因为视图稍微陈旧而受到影响)。

I would use a synchronized LinkedHashSet until there was enough justification to consider alternatives. The primary benefit that a more concurrent solution could offer is lock splitting.

The simplest concurrent approach would be a a ConcurrentHashMap (acting as a set) and a ConcurrentLinkedQueue. The ordering of operations would provide the desired constraint. An offer() would first perform a CHM#putIfAbsent() and if successful insert into the CLQ. A poll() would take from the CLQ and then remove it from the CHM. This means that we consider an entry in our queue if it is in the map and the CLQ provides the ordering. The performance could then be adjusted by increasing the map's concurrencyLevel. If you are tolerant to additional racy-ness, then a cheap CHM#get() could act as a reasonable precondition (but it can suffer by being a slightly stale view).

<逆流佳人身旁 2024-09-14 01:23:05

java.util.concurrent.ConcurrentLinkedQueue 可以帮助您完成大部分工作。

用您自己的类包装 ConcurrentLinkedQueue,以检查添加的唯一性。您的代码必须是线程安全的。

A java.util.concurrent.ConcurrentLinkedQueue gets you most of the way there.

Wrap the ConcurrentLinkedQueue with your own class that checks for the uniqueness of an add. Your code has to be thread safe.

夜空下最亮的亮点 2024-09-14 01:23:05

具有 Set 语义的并发队列是什么意思?如果您指的是真正的并发结构(而不​​是线程安全结构),那么我会认为您正在要求一匹小马。

例如,如果您调用 put(element) 并检测到某些内容已经存在并立即被删除,会发生什么情况?例如,如果 offer(element) || 在您的情况下意味着什么queue.contains(element) 返回 false

在并发世界中,这些事情通常需要稍微不同地思考,因为除非你停止世界(锁定它),否则一切都不是看起来的那样。否则你通常会看到过去的事情。那么,你到底想做什么?

What do you mean by a concurrent queue with Set semantics? If you mean a truly concurrent structure (as opposed to a thread-safe structure) then I would contend that you are asking for a pony.

What happens for instance if you call put(element) and detect that something is already there which immediately is removed? For instance, what does it mean in your case if offer(element) || queue.contains(element) returns false?

These kinds of things often need to thought about slightly differently in a concurrent world as often nothing is as it seems unless you stop the world (lock it down). Otherwise you are usually looking at something in the past. So, what are you actually trying to do?

药祭#氼 2024-09-14 01:23:05

也许扩展 ArrayBlockingQueue。为了访问(包访问)锁,我必须将我的子类放在同一个包中。警告:我还没有测试过这个。

package java.util.concurrent;

import java.util.Collection;
import java.util.concurrent.locks.ReentrantLock;

public class DeDupingBlockingQueue<E> extends ArrayBlockingQueue<E> {

    public DeDupingBlockingQueue(int capacity) {
        super(capacity);
    }

    public DeDupingBlockingQueue(int capacity, boolean fair) {
        super(capacity, fair);
    }

    public DeDupingBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
        super(capacity, fair, c);
    }

    @Override
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return false;
            return super.add(e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return true;
            return super.offer(e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void put(E e) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //Should this be lock.lock() instead?
        try {
            if (contains(e)) return;
            super.put(e); //if it blocks, it does so without holding the lock.
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return true;
            return super.offer(e, timeout, unit); //if it blocks, it does so without holding the lock.
        } finally {
            lock.unlock();
        }
    }
}

Perhaps extend ArrayBlockingQueue. In order to get access to the (package-access) lock, I had to put my sub-class within the same package. Caveat: I haven't tested this.

package java.util.concurrent;

import java.util.Collection;
import java.util.concurrent.locks.ReentrantLock;

public class DeDupingBlockingQueue<E> extends ArrayBlockingQueue<E> {

    public DeDupingBlockingQueue(int capacity) {
        super(capacity);
    }

    public DeDupingBlockingQueue(int capacity, boolean fair) {
        super(capacity, fair);
    }

    public DeDupingBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
        super(capacity, fair, c);
    }

    @Override
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return false;
            return super.add(e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return true;
            return super.offer(e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void put(E e) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //Should this be lock.lock() instead?
        try {
            if (contains(e)) return;
            super.put(e); //if it blocks, it does so without holding the lock.
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return true;
            return super.offer(e, timeout, unit); //if it blocks, it does so without holding the lock.
        } finally {
            lock.unlock();
        }
    }
}
幸福还没到 2024-09-14 01:23:05

对于唯一对象队列的简单答案如下:

import java.util.concurrent.ConcurrentLinkedQueue;

public class FinalQueue {

    class Bin {
        private int a;
        private int b;

        public Bin(int a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public int hashCode() {
            return a * b;
        }

        public String toString() {
            return a + ":" + b;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            Bin other = (Bin) obj;
            if ((a != other.a) || (b != other.b))
                return false;
            return true;
        }
    }

    private ConcurrentLinkedQueue<Bin> queue;

    public FinalQueue() {
        queue = new ConcurrentLinkedQueue<Bin>();
    }

    public synchronized void enqueue(Bin ipAddress) {
        if (!queue.contains(ipAddress))
            queue.add(ipAddress);
    }

    public Bin dequeue() {
        return queue.poll();
    }

    public String toString() {
        return "" + queue;
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        FinalQueue queue = new FinalQueue();
        Bin a = queue.new Bin(2,6);

        queue.enqueue(a);
        queue.enqueue(queue.new Bin(13, 3));
        queue.enqueue(queue.new Bin(13, 3));
        queue.enqueue(queue.new Bin(14, 3));
        queue.enqueue(queue.new Bin(13, 9));
        queue.enqueue(queue.new Bin(18, 3));
        queue.enqueue(queue.new Bin(14, 7));
        Bin x= queue.dequeue();
        System.out.println(x.a);
        System.out.println(queue.toString());
        System.out.println("Dequeue..." + queue.dequeue());
        System.out.println("Dequeue..." + queue.dequeue());
        System.out.println(queue.toString());
    }
}

A simple answer for a queue of unique objects can be as follow:

import java.util.concurrent.ConcurrentLinkedQueue;

public class FinalQueue {

    class Bin {
        private int a;
        private int b;

        public Bin(int a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public int hashCode() {
            return a * b;
        }

        public String toString() {
            return a + ":" + b;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            Bin other = (Bin) obj;
            if ((a != other.a) || (b != other.b))
                return false;
            return true;
        }
    }

    private ConcurrentLinkedQueue<Bin> queue;

    public FinalQueue() {
        queue = new ConcurrentLinkedQueue<Bin>();
    }

    public synchronized void enqueue(Bin ipAddress) {
        if (!queue.contains(ipAddress))
            queue.add(ipAddress);
    }

    public Bin dequeue() {
        return queue.poll();
    }

    public String toString() {
        return "" + queue;
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        FinalQueue queue = new FinalQueue();
        Bin a = queue.new Bin(2,6);

        queue.enqueue(a);
        queue.enqueue(queue.new Bin(13, 3));
        queue.enqueue(queue.new Bin(13, 3));
        queue.enqueue(queue.new Bin(14, 3));
        queue.enqueue(queue.new Bin(13, 9));
        queue.enqueue(queue.new Bin(18, 3));
        queue.enqueue(queue.new Bin(14, 7));
        Bin x= queue.dequeue();
        System.out.println(x.a);
        System.out.println(queue.toString());
        System.out.println("Dequeue..." + queue.dequeue());
        System.out.println("Dequeue..." + queue.dequeue());
        System.out.println(queue.toString());
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文