Java线程池的使用
我正在尝试编写一个多线程网络爬虫。
我的主入口类有以下代码:
ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
URL url = frontier.get();
if(url == null)
return;
exec.execute(new URLCrawler(this, url));
}
URLCrawler 获取指定的 URL,解析 HTML,从中提取链接,然后将看不见的链接安排回前沿。
边界是未爬网 URL 的队列。问题是如何编写 get() 方法。 如果队列为空,则应等待任何 URLCrawler 完成,然后重试。 仅当队列为空并且当前没有活动的 URLCrawler 时,它才应返回 null。
我的第一个想法是使用 AtomicInteger 来计算当前正在工作的 URLCrawler 的数量,并使用辅助对象来调用 notificationAll()/wait() 。每个爬网程序在启动时都会增加当前正在工作的 URLCrawler 的数量,在退出时会减少该数量,并通知对象它已完成。
但我读到,notify()/notifyAll() 和 wait() 是一些不推荐使用的线程通信方法。
在这个工作模式中我应该使用什么?类似于M个生产者和N个消费者,问题是如何处理生产者的疲惫。
I'm trying to write a multithreaded web crawler.
My main entry class has the following code:
ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
URL url = frontier.get();
if(url == null)
return;
exec.execute(new URLCrawler(this, url));
}
The URLCrawler fetches the specified URL, parses the HTML extracts links from it, and schedules unseen links back to frontier.
A frontier is a queue of uncrawled URLs. The problem is how to write the get() method.
If the queue is empty, it should wait until any URLCrawlers finish and then try again.
It should return null only when the queue is empty and there is no currently active URLCrawler.
My first idea was to use an AtomicInteger for counting current number of working URLCrawlers and an auxiliary object for notifyAll()/wait() calls. Each crawler on start increments the number of current working URLCrawlers, and on exit decrements it, and notify the object that it has completed.
But I read that notify()/notifyAll() and wait() are somewhat deprecated methods to do thread communication.
What should I use in this work pattern? It is similar to M producers and N consumers, the question is how to deal with exaustion of producers.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(6)
我不确定我是否理解您的设计,但这可能是
信号量
I am not sure I understand your design, but this may be a job for a
Semaphore
一种选择是使“前沿”成为阻塞队列,因此任何试图从中“获取”的线程都会阻塞。
一旦任何其他 URLCrawler 将对象放入该队列,任何其他线程都会自动收到通知(对象出队)
One option is to make "frontier" a blocking queue, So any thread trying to "get" from it will block.
As soon as any other URLCrawler puts objects into that queue, any other threads will be automatically notified (with the object dequeued)
我认为在这种情况下使用等待/通知是合理的。想不出任何直接的方法来使用 juc
在一个类中,我们调用Coordinator:
然后,
I think use of wait/notify is justified in this case. Can't think of any straight forward way to do this using j.u.c.
In a class, let's call Coordinator:
then,
我认为您的用例的基本构建块是“锁存器”,类似于 CountDownLatch,但与 CountDownLatch 不同,它也允许递增计数。
这种锁存器的接口可能是
计数的合法值是 0 及以上。 wait() 方法会让您阻塞,直到计数减至零。
如果您有这样的闩锁,那么您的用例就可以相当容易地描述。我还怀疑这个解决方案中可以消除队列(前沿)(执行器无论如何都提供了一个,所以它有点多余)。我将重写您的主例程,因为
您的 URLCrawler 将以这种方式使用闩锁:
至于闩锁实现,可以有多种可能的实现,包括基于 wait() 和 notificationAll() 的实现,以及使用 Lock 的实现和 Condition 到使用 AbstractQueuedSynchronizer 的实现。我认为所有这些实现都非常简单。请注意,wait()-notifyAll() 版本和 Lock-Condition 版本将基于互斥,而 AQS 版本将利用 CAS(比较和交换),因此在某些情况下可能会更好地扩展。
I think a basic building block for your use case is a "latch", similar to CountDownLatch, but unlike CountDownLatch, one that permits incrementing the count as well.
An interface for such a latch might be
Legal values for counts would be 0 and up. The await() method would let you block until the count goes down to zero.
If you have such a latch, your use case can be described fairly easily. I also suspect the queue (frontier) can be eliminated in this solution (executor provides one anyway so it's somewhat redundant). I would rewrite your main routine as
Your URLCrawler would use the latch in this manner:
As for the latch implementations, there can be a number of possible implementations, ranging from one that's based on wait() and notifyAll(), one that uses Lock and Condition, to an implementation that uses the AbstractQueuedSynchronizer. All of these implementations I think would be pretty straightforward. Note that the wait()-notifyAll() version and the Lock-Condition version would be based on mutual exclusion, whereas the AQS version would utilize CAS (compare-and-swap), and thus might scale better under certain situations.
这个问题有点老了,但我想我已经找到了一些简单、有效的解决方案:
扩展 ThreadPoolExecutor 类,如下所示。新功能是保持活动任务计数(不幸的是,如果
getActiveCount()
不可靠)。如果taskCount.get() == 0
并且没有更多排队任务,则意味着没有任何事情可做,执行器将关闭。你有你的退出标准。另外,如果您创建了执行程序,但未能提交任何任务,它也不会阻塞:您还需要做的一件事是以保留对 Executor 引用的方式实现您的 Runnable 您正在使用以便能够提交新任务。这是一个模拟:
The question is a bit old, but I think i have found some simple, working solution:
Extend the ThreadPoolExecutor class like below. The new functionality is keeping the active task count (unfortunately, provided
getActiveCount()
is unreliable). IftaskCount.get() == 0
and there are no more queued tasks, it means that there is nothing to be done and executor shuts down. You have your exit criteria. Also, if you create your executor, but fail to submit any tasks, it won't block:One more thing you have to do is implement your
Runnable
in a way it keeps reference toExecutor
you are using in order to be able to submit new tasks. Here is a mock:我想推荐一个自适应执行器。根据特征值,您可以选择串行化或并行化线程来执行。在下面的示例中,PUID 是一个字符串/对象,我想用它来做出决定。您可以更改逻辑以适合您的代码。代码的某些部分被注释以允许进一步的实验。
类 AdaptiveExecutor 实现 Executor {
最终队列任务 = new LinkedBlockingQueue();
可运行活动;
//ExecutorService threadExecutor=Executors.newCachedThreadPool();
static ExecutorService threadExecutor=Executors.newFixedThreadPool(4);
}
I'd like to suggest an AdaptiveExecuter. Based on a characteristic value, you can choose to serialize or parallalize a thread for execution. In the sample below, PUID is a string/object that I wanted to use to make that decision. You can alter the logic to suit your code. Some portions of code are commented to allow further experiments.
class AdaptiveExecutor implements Executor {
final Queue tasks = new LinkedBlockingQueue();
Runnable active ;
//ExecutorService threadExecutor=Executors.newCachedThreadPool();
static ExecutorService threadExecutor=Executors.newFixedThreadPool(4);
}