高效阻塞查询的建议

发布于 2024-08-17 11:55:12 字数 924 浏览 5 评论 0原文

我想将元组对象存储在并发java集合中,然后有一个高效的阻塞查询方法,该方法返回与模式匹配的第一个元素。如果没有这样的元素可用,它将阻塞,直到存在这样的元素。

例如,如果我有一个类:

public class Pair {
  public final String first;
  public final String Second;
  public Pair( String first, String second ) {
    this.first = first;
    this.second = second;
  }
}

和一个像这样的集合:

public class FunkyCollection {
  public void add( Pair p ) { /* ... */ }
  public Pair get( Pair p ) { /* ... */ }
}

我想像这样查询它:

myFunkyCollection.get( new Pair( null, "foo" ) );

它返回第一个可用的对,其 second 字段等于“foo”或阻塞,直到添加这样的元素。另一个查询示例:

myFunkyCollection.get( new Pair( null, null ) );

应返回第一个可用的对,无论其值如何。

解决方案已经存在吗?如果不是这种情况,您建议如何实现 get( Pair p ) 方法?

说明:方法 get(Pair p) 还必须删除该元素。这个名字的选择并不是很聪明。更好的名称是 take( ... )

I would like to store tuples objects in a concurent java collection and then have an efficient, blocking query method that returns the first element matching a pattern. If no such element is available, it would block until such element is present.

For instance if I have a class:

public class Pair {
  public final String first;
  public final String Second;
  public Pair( String first, String second ) {
    this.first = first;
    this.second = second;
  }
}

And a collection like:

public class FunkyCollection {
  public void add( Pair p ) { /* ... */ }
  public Pair get( Pair p ) { /* ... */ }
}

I would like to query it like:

myFunkyCollection.get( new Pair( null, "foo" ) );

which returns the first available pair with the second field equalling "foo" or blocks until such element is added. Another query example:

myFunkyCollection.get( new Pair( null, null ) );

should return the first available pair whatever its values.

Does a solution already exists ? If it is not the case, what do you suggest to implement the get( Pair p ) method ?

Clarification: The method get( Pair p) must also remove the element. The name choice was not very smart. A better name would be take( ... ).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

轻拂→两袖风尘 2024-08-24 11:55:12

这是一些源代码。它与 cb160 所说的基本相同,但是拥有源代码可能有助于解决您仍然存在的任何问题。特别是 FunkyCollection 上的方法必须同步。

正如 Mereron 指出的,每次添加新对象时,get 方法都会对每个阻塞的 get 执行一次 O(n) 扫描。它还执行 O(n) 操作来删除对象。这可以通过使用类似于链接列表的数据结构来改进,您可以在其中保留指向最后检查的项目的迭代器。我没有提供此优化的源代码,但如果您需要额外的性能,实现起来应该不会太困难。

import java.util.*;

public class BlockingQueries
{
    public class Pair
    {
        public final String first;
        public final String second;
        public Pair(String first, String second)
        {
            this.first = first;
            this.second = second;
        }
    }

    public class FunkyCollection
    {
        final ArrayList<Pair> pairs = new ArrayList<Pair>();

        public synchronized void add( Pair p )
        {
            pairs.add(p);
            notifyAll();
        }

        public synchronized Pair get( Pair p ) throws InterruptedException
        {
            while (true)
            {
                for (Iterator<Pair> i = pairs.iterator(); i.hasNext(); )
                {
                    Pair pair = i.next();
                    boolean firstOk = p.first == null || p.first.equals(pair.first);
                    boolean secondOk = p.second == null || p.second.equals(pair.second);
                    if (firstOk && secondOk)
                    {
                        i.remove();
                        return pair;                
                    }
                }
                wait();
            }
        }   
    }

    class Producer implements Runnable
    {
        private FunkyCollection funkyCollection;

        public Producer(FunkyCollection funkyCollection)
        {
            this.funkyCollection = funkyCollection;
        }

        public void run()
        {
            try
            {
                for (int i = 0; i < 10; ++i)
                {
                    System.out.println("Adding item " + i);
                    funkyCollection.add(new Pair("foo" + i, "bar" + i));
                    Thread.sleep(1000);
                }
            }
            catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void go() throws InterruptedException
    {
        FunkyCollection funkyCollection = new FunkyCollection();
        new Thread(new Producer(funkyCollection)).start();
        System.out.println("Fetching bar5.");
        funkyCollection.get(new Pair(null, "bar5"));
        System.out.println("Fetching foo2.");
        funkyCollection.get(new Pair("foo2", null));
        System.out.println("Fetching foo8, bar8");
        funkyCollection.get(new Pair("foo8", "bar8"));
        System.out.println("Finished.");
    }

    public static void main(String[] args) throws InterruptedException
    {
        new BlockingQueries().go();
    }
}

输出:

Fetching bar5.
Adding item 0
Adding item 1
Adding item 2
Adding item 3
Adding item 4
Adding item 5
Fetching foo2.
Fetching foo8, bar8
Adding item 6
Adding item 7
Adding item 8
Finished.
Adding item 9

请注意,我将所有内容都放入一个源文件中,以使其更易于运行。

Here's some source code. It basically the same as what cb160 said, but having the source code might help to clear up any questions you may still have. In particular the methods on the FunkyCollection must be synchronized.

As meriton pointed out, the get method performs an O(n) scan for every blocked get every time a new object is added. It also performs an O(n) operation to remove objects. This could be improved by using a data structure similar to a linked list where you can keep an iterator to the last item checked. I haven't provided source code for this optimization, but it shouldn't be too difficult to implement if you need the extra performance.

import java.util.*;

public class BlockingQueries
{
    public class Pair
    {
        public final String first;
        public final String second;
        public Pair(String first, String second)
        {
            this.first = first;
            this.second = second;
        }
    }

    public class FunkyCollection
    {
        final ArrayList<Pair> pairs = new ArrayList<Pair>();

        public synchronized void add( Pair p )
        {
            pairs.add(p);
            notifyAll();
        }

        public synchronized Pair get( Pair p ) throws InterruptedException
        {
            while (true)
            {
                for (Iterator<Pair> i = pairs.iterator(); i.hasNext(); )
                {
                    Pair pair = i.next();
                    boolean firstOk = p.first == null || p.first.equals(pair.first);
                    boolean secondOk = p.second == null || p.second.equals(pair.second);
                    if (firstOk && secondOk)
                    {
                        i.remove();
                        return pair;                
                    }
                }
                wait();
            }
        }   
    }

    class Producer implements Runnable
    {
        private FunkyCollection funkyCollection;

        public Producer(FunkyCollection funkyCollection)
        {
            this.funkyCollection = funkyCollection;
        }

        public void run()
        {
            try
            {
                for (int i = 0; i < 10; ++i)
                {
                    System.out.println("Adding item " + i);
                    funkyCollection.add(new Pair("foo" + i, "bar" + i));
                    Thread.sleep(1000);
                }
            }
            catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void go() throws InterruptedException
    {
        FunkyCollection funkyCollection = new FunkyCollection();
        new Thread(new Producer(funkyCollection)).start();
        System.out.println("Fetching bar5.");
        funkyCollection.get(new Pair(null, "bar5"));
        System.out.println("Fetching foo2.");
        funkyCollection.get(new Pair("foo2", null));
        System.out.println("Fetching foo8, bar8");
        funkyCollection.get(new Pair("foo8", "bar8"));
        System.out.println("Finished.");
    }

    public static void main(String[] args) throws InterruptedException
    {
        new BlockingQueries().go();
    }
}

Output:

Fetching bar5.
Adding item 0
Adding item 1
Adding item 2
Adding item 3
Adding item 4
Adding item 5
Fetching foo2.
Fetching foo8, bar8
Adding item 6
Adding item 7
Adding item 8
Finished.
Adding item 9

Note that I put everything into one source file to make it easier to run.

妄断弥空 2024-08-24 11:55:12

据我所知,没有现有的容器可以提供这种行为。您面临的一个问题是没有现有条目与查询匹配的情况。在这种情况下,您必须等待新条目到达,并且这些新条目应该到达序列的末尾。鉴于您正在阻止,您不想检查最新添加之前的所有条目,因为您已经检查了它们并确定它们不匹配。因此,您需要某种方法来记录当前位置,并能够在新条目到达时从那里向前搜索。

这种等待是 < 的工作代码>条件。正如cb160的回答中所建议的那样,您应该在集合中分配一个 Condition 实例,并通过 Condition#await()。您还应该向 get() 方法公开一个伴随重载,以允许定时等待:

public Pair get(Pair p) throws InterruptedException;
public Pair get(Pair p, long time, TimeUnit unit) throws InterruptedException;

每次调用 add() 时,调用 Condition#signalAll() 解锁等待未满足的 get() 查询的线程,允许它们扫描最近添加的内容。

您还没有提到如何或是否曾经从该容器中移除物品。如果容器仅增长,则可以简化线程扫描其内容的方式,而不必担心其他线程的争用会改变容器。每个线程都可以放心地开始查询,确定可检查的最小条目数。然而,如果您允许移除物品,则会面临更多挑战。

I know of no existing container that will provide this behavior. One problem you face is the case where no existing entry matches the query. In that case, you'll have to wait for new entries to arrive, and those new entries are supposed to arrive at the tail of the sequence. Given that you're blocking, you don't want to have to examine all the entries that precede the latest addition, because you've already inspected them and determined that they don't match. Hence, you need some way to record your current position, and be able to search forward from there whenever a new entry arrives.

This waiting is a job for a Condition. As suggested in cb160's answer, you should allocate a Condition instance inside your collection, and block on it via Condition#await(). You should also expose a companion overload to your get() method to allow timed waiting:

public Pair get(Pair p) throws InterruptedException;
public Pair get(Pair p, long time, TimeUnit unit) throws InterruptedException;

Upon each call to add(), call on Condition#signalAll() to unblock the threads waiting on unsatisfied get() queries, allowing them to scan the recent additions.

You haven't mentioned how or if items are ever removed from this container. If the container only grows, that simplifies how threads can scan its contents without worrying about contention from other threads mutating the container. Each thread can begin its query with confidence as to the minimum number of entries available to inspect. However, if you allow removal of items, there are many more challenges to confront.

楠木可依 2024-08-24 11:55:12

在 FunkyCollection 添加方法中,每次添加元素时,您都可以对集合本身调用 notificationAll。

在 get 方法中,如果底层容器(任何合适的容器都可以)不包含您需要的值,请等待 FunkyCollection。当通知等待时,检查底层容器是否包含您需要的结果。如果存在,则返回该值,否则,再次等待。

In your FunkyCollection add method you could call notifyAll on the collection itself every time you add an element.

In the get method, if the underlying container (Any suitable conatiner is fine) doesn't contain the value you need, wait on the FunkyCollection. When the wait is notified, check to see if the underlying container contains the result you need. If it does, return the value, otherwise, wait again.

爱已欠费 2024-08-24 11:55:12

看来您正在寻找元组空间的实现。 关于它们的维基百科文章列出了一些 Java 实现,也许您可​​以使用其中之一。如果做不到这一点,您可能会找到一个开源实现来模仿,或相关的研究论文。

It appears you are looking for an implementation of Tuple Spaces. The Wikipedia article about them lists a few implementations for Java, perhaps you can use one of those. Failing that, you might find an open source implementation to imitate, or relevant research papers.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文