java:对不可变的Iterable进行并发迭代

发布于 2024-11-05 11:36:26 字数 1659 浏览 0 评论 0原文

我有一个包含大量元素的不可变 Iterable 。 (它恰好是一个 List<>,但没关系。)

我想做的是启动一些并行/异步任务来迭代 Iterable<> 使用相同的迭代器,并且我想知道我应该使用什么接口

这是一个带有待确定接口 QuasiIteratorInterface 的示例实现:

public void process(Iterable<X> iterable)
{
   QuasiIteratorInterface<X> qit = ParallelIteratorWrapper.iterate(iterable);
   for (int i = 0; i < MAX_PARALLEL_COUNT; ++i)
   {
      SomeWorkerClass worker = new SomeWorkerClass(qit);
      worker.start();
   }
}

class ParallelIteratorWrapper<T> implements QuasiIteratorInterface<T>
{
   final private Iterator<T> iterator;
   final private Object lock = new Object();
   private ParallelIteratorWrapper(Iterator<T> iterator) { 
      this.iterator = iterator;
   }
   static public <T> ParallelIteratorWrapper<T> iterate(Iterable<T> iterable)
   {
      return new ParallelIteratorWrapper(iterable.iterator());
   }
   private T getNextItem()
   {
      synchronized(lock)
      {
         if (this.iterator.hasNext())
            return this.iterator.next();
         else
            return null;
      }
   }
   /* QuasiIteratorInterface methods here */
}

这是我的问题:

  • 直接使用 Iterator 没有意义,因为 hasNext()和 next() 有同步问题,如果其他人在你之前调用 next(),hasNext() 就没用。

  • 我很想使用 Queue,但我唯一需要的方法是 poll()

  • 我很想使用 ConcurrentLinkedQueue 来保存大量元素...但我可能必须多次迭代这些元素,所以我不能使用它。

有什么建议吗?

I have an immutable Iterable<X> with a large number of elements. (it happens to be a List<> but never mind that.)

What I would like to do is start a few parallel / asynchronous tasks to iterate over the Iterable<> with the same iterator, and I'm wondering what interface I should use.

Here's a sample implementation with the to-be-determined interface QuasiIteratorInterface:

public void process(Iterable<X> iterable)
{
   QuasiIteratorInterface<X> qit = ParallelIteratorWrapper.iterate(iterable);
   for (int i = 0; i < MAX_PARALLEL_COUNT; ++i)
   {
      SomeWorkerClass worker = new SomeWorkerClass(qit);
      worker.start();
   }
}

class ParallelIteratorWrapper<T> implements QuasiIteratorInterface<T>
{
   final private Iterator<T> iterator;
   final private Object lock = new Object();
   private ParallelIteratorWrapper(Iterator<T> iterator) { 
      this.iterator = iterator;
   }
   static public <T> ParallelIteratorWrapper<T> iterate(Iterable<T> iterable)
   {
      return new ParallelIteratorWrapper(iterable.iterator());
   }
   private T getNextItem()
   {
      synchronized(lock)
      {
         if (this.iterator.hasNext())
            return this.iterator.next();
         else
            return null;
      }
   }
   /* QuasiIteratorInterface methods here */
}

Here's my problem:

  • it doesn't make sense to use Iterator directly, since hasNext() and next() have a synchronization problem, where hasNext() is useless if someone else calls next() before you do.

  • I'd love to use Queue, but the only method I need is poll()

  • I'd love to use ConcurrentLinkedQueue to hold my large number of elements... except I may have to iterate through the elements more than once, so I can't use that.

Any suggestions?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

心作怪 2024-11-12 11:36:26

使用 poll() 方法或等效方法创建您自己的 Producer 接口(Guava 的 Supplier)。实现选项有很多,但如果你有一个不可变的随机访问列表,那么你可以简单地维护一个线程安全的单调计数器(例如 AtomicInteger)并调用 list.get(int) 例如:

class ListSupplier<T> implements Supplier<T> {
  private final AtomicInteger next = new AtomicInteger();
  private final List<T> elements; // ctor injected

  …
  public <T> get() {
    // real impl more complicated due to bounds checks
    // and what to do when exhausted
    return elements.get(next.getAndIncrement());
  }
}

这是线程安全的,但你会可能想要在耗尽时返回 Option 样式的东西或 null。

Create your own Producer interface with the poll() method or equivalent (Guava's Supplier for instance). The implementation options are many but if you have an immutable random access list then you can simply maintain a thread-safe monotonic counter (AtomicInteger for instance) and call list.get(int) eg:

class ListSupplier<T> implements Supplier<T> {
  private final AtomicInteger next = new AtomicInteger();
  private final List<T> elements; // ctor injected

  …
  public <T> get() {
    // real impl more complicated due to bounds checks
    // and what to do when exhausted
    return elements.get(next.getAndIncrement());
  }
}

That is thread-safe, but you'd probably want to either return an Option style thing or null when exhausted.

汐鸠 2024-11-12 11:36:26

拥有一个调度程序线程,用于迭代 Iterable 并将元素调度到多个工作线程,这些工作线程对元素执行工作。您可以使用 ThreadPoolExecutor 来自动执行此操作。

Have one dispatcher thread that iterates over Iterable and dispatches elements to multiple worker threads that perform the work on the elements. You can use ThreadPoolExecutor to automate this.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文