java:对不可变的Iterable进行并发迭代
我有一个包含大量元素的不可变 Iterable
。 (它恰好是一个 List<>,但没关系。)
我想做的是启动一些并行/异步任务来迭代 Iterable<> 使用相同的迭代器,并且我想知道我应该使用什么接口。
这是一个带有待确定接口 QuasiIteratorInterface 的示例实现:
public void process(Iterable<X> iterable)
{
QuasiIteratorInterface<X> qit = ParallelIteratorWrapper.iterate(iterable);
for (int i = 0; i < MAX_PARALLEL_COUNT; ++i)
{
SomeWorkerClass worker = new SomeWorkerClass(qit);
worker.start();
}
}
class ParallelIteratorWrapper<T> implements QuasiIteratorInterface<T>
{
final private Iterator<T> iterator;
final private Object lock = new Object();
private ParallelIteratorWrapper(Iterator<T> iterator) {
this.iterator = iterator;
}
static public <T> ParallelIteratorWrapper<T> iterate(Iterable<T> iterable)
{
return new ParallelIteratorWrapper(iterable.iterator());
}
private T getNextItem()
{
synchronized(lock)
{
if (this.iterator.hasNext())
return this.iterator.next();
else
return null;
}
}
/* QuasiIteratorInterface methods here */
}
这是我的问题:
直接使用
Iterator
没有意义,因为 hasNext()和 next() 有同步问题,如果其他人在你之前调用 next(),hasNext() 就没用。我很想使用
Queue
,但我唯一需要的方法是poll()
我很想使用 ConcurrentLinkedQueue 来保存大量元素...但我可能必须多次迭代这些元素,所以我不能使用它。
有什么建议吗?
I have an immutable Iterable<X>
with a large number of elements. (it happens to be a List<>
but never mind that.)
What I would like to do is start a few parallel / asynchronous tasks to iterate over the Iterable<>
with the same iterator, and I'm wondering what interface I should use.
Here's a sample implementation with the to-be-determined interface QuasiIteratorInterface
:
public void process(Iterable<X> iterable)
{
QuasiIteratorInterface<X> qit = ParallelIteratorWrapper.iterate(iterable);
for (int i = 0; i < MAX_PARALLEL_COUNT; ++i)
{
SomeWorkerClass worker = new SomeWorkerClass(qit);
worker.start();
}
}
class ParallelIteratorWrapper<T> implements QuasiIteratorInterface<T>
{
final private Iterator<T> iterator;
final private Object lock = new Object();
private ParallelIteratorWrapper(Iterator<T> iterator) {
this.iterator = iterator;
}
static public <T> ParallelIteratorWrapper<T> iterate(Iterable<T> iterable)
{
return new ParallelIteratorWrapper(iterable.iterator());
}
private T getNextItem()
{
synchronized(lock)
{
if (this.iterator.hasNext())
return this.iterator.next();
else
return null;
}
}
/* QuasiIteratorInterface methods here */
}
Here's my problem:
it doesn't make sense to use
Iterator
directly, since hasNext() and next() have a synchronization problem, where hasNext() is useless if someone else calls next() before you do.I'd love to use
Queue
, but the only method I need ispoll()
I'd love to use ConcurrentLinkedQueue to hold my large number of elements... except I may have to iterate through the elements more than once, so I can't use that.
Any suggestions?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
使用
poll()
方法或等效方法创建您自己的Producer
接口(Guava 的Supplier
)。实现选项有很多,但如果你有一个不可变的随机访问列表,那么你可以简单地维护一个线程安全的单调计数器(例如 AtomicInteger)并调用 list.get(int) 例如:这是线程安全的,但你会可能想要在耗尽时返回 Option 样式的东西或 null。
Create your own
Producer
interface with thepoll()
method or equivalent (Guava'sSupplier
for instance). The implementation options are many but if you have an immutable random access list then you can simply maintain a thread-safe monotonic counter (AtomicInteger for instance) and call list.get(int) eg:That is thread-safe, but you'd probably want to either return an Option style thing or null when exhausted.
拥有一个调度程序线程,用于迭代 Iterable 并将元素调度到多个工作线程,这些工作线程对元素执行工作。您可以使用 ThreadPoolExecutor 来自动执行此操作。
Have one dispatcher thread that iterates over Iterable and dispatches elements to multiple worker threads that perform the work on the elements. You can use
ThreadPoolExecutor
to automate this.