Java线程池的使用

发布于 2024-09-12 09:20:13 字数 711 浏览 4 评论 0原文

我正在尝试编写一个多线程网络爬虫。

我的主入口类有以下代码:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null)
         return;
exec.execute(new URLCrawler(this, url));
}

URLCrawler 获取指定的 URL,解析 HTML,从中提取链接,然后将看不见的链接安排回前沿。

边界是未爬网 URL 的队列。问题是如何编写 get() 方法。 如果队列为空,则应等待任何 URLCrawler 完成,然后重试。 仅当队列为空并且当前没有活动的 URLCrawler 时,它才应返回 null。

我的第一个想法是使用 AtomicInteger 来计算当前正在工作的 URLCrawler 的数量,并使用辅助对象来调用 notificationAll()/wait() 。每个爬网程序在启动时都会增加当前正在工作的 URLCrawler 的数量,在退出时会减少该数量,并通知对象它已完成。

但我读到,notify()/notifyAll() 和 wait() 是一些不推荐使用的线程通信方法。

在这个工作模式中我应该使用什么?类似于M个生产者和N个消费者,问题是如何处理生产者的疲惫。

I'm trying to write a multithreaded web crawler.

My main entry class has the following code:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null)
         return;
exec.execute(new URLCrawler(this, url));
}

The URLCrawler fetches the specified URL, parses the HTML extracts links from it, and schedules unseen links back to frontier.

A frontier is a queue of uncrawled URLs. The problem is how to write the get() method.
If the queue is empty, it should wait until any URLCrawlers finish and then try again.
It should return null only when the queue is empty and there is no currently active URLCrawler.

My first idea was to use an AtomicInteger for counting current number of working URLCrawlers and an auxiliary object for notifyAll()/wait() calls. Each crawler on start increments the number of current working URLCrawlers, and on exit decrements it, and notify the object that it has completed.

But I read that notify()/notifyAll() and wait() are somewhat deprecated methods to do thread communication.

What should I use in this work pattern? It is similar to M producers and N consumers, the question is how to deal with exaustion of producers.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

∝单色的世界 2024-09-19 09:20:13

我不确定我是否理解您的设计,但这可能是 信号量

I am not sure I understand your design, but this may be a job for a Semaphore

丢了幸福的猪 2024-09-19 09:20:13

一种选择是使“前沿”成为阻塞队列,因此任何试图从中“获取”的线程都会阻塞。
一旦任何其他 URLCrawler 将对象放入该队列,任何其他线程都会自动收到通知(对象出队)

One option is to make "frontier" a blocking queue, So any thread trying to "get" from it will block.
As soon as any other URLCrawler puts objects into that queue, any other threads will be automatically notified (with the object dequeued)

幽梦紫曦~ 2024-09-19 09:20:13

我认为在这种情况下使用等待/通知是合理的。想不出任何直接的方法来使用 juc
在一个类中,我们调用Coordinator:

private final int numOfCrawlers;
private int waiting;

public boolean shouldTryAgain(){
    synchronized(this){
        waiting++;
        if(waiting>=numOfCrawlers){
            //Everybody is waiting, terminate
            return false;
        }else{
            wait();//spurious wake up is okay
            //waked up for whatever reason. Try again
            waiting--;
            return true;
        }
    }

public void hasEnqueued(){
    synchronized(this){
        notifyAll();
    }
} 

然后,

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null){
        if(!coordinator.shouldTryAgain()){
            //all threads are waiting. No possibility of new jobs.
            return;
        }else{
            //Possible that there are other jobs. Try again
            continue;
        }
    }
    exec.execute(new URLCrawler(this, url));
}//while(true)

I think use of wait/notify is justified in this case. Can't think of any straight forward way to do this using j.u.c.
In a class, let's call Coordinator:

private final int numOfCrawlers;
private int waiting;

public boolean shouldTryAgain(){
    synchronized(this){
        waiting++;
        if(waiting>=numOfCrawlers){
            //Everybody is waiting, terminate
            return false;
        }else{
            wait();//spurious wake up is okay
            //waked up for whatever reason. Try again
            waiting--;
            return true;
        }
    }

public void hasEnqueued(){
    synchronized(this){
        notifyAll();
    }
} 

then,

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null){
        if(!coordinator.shouldTryAgain()){
            //all threads are waiting. No possibility of new jobs.
            return;
        }else{
            //Possible that there are other jobs. Try again
            continue;
        }
    }
    exec.execute(new URLCrawler(this, url));
}//while(true)
私藏温柔 2024-09-19 09:20:13

我认为您的用例的基本构建块是“锁存器”,类似于 CountDownLatch,但与 CountDownLatch 不同,它也允许递增计数。

这种锁存器的接口可能是

public interface Latch {
    public void countDown();
    public void countUp();
    public void await() throws InterruptedException;
    public int getCount();
}

计数的合法值是 0 及以上。 wait() 方法会让您阻塞,直到计数减至零。

如果您有这样的闩锁,那么您的用例就可以相当容易地描述。我还怀疑这个解决方案中可以消除队列(前沿)(执行器无论如何都提供了一个,所以它有点多余)。我将重写您的主例程,因为

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers);
Latch latch = ...; // instantiate a latch
URL[] initialUrls = ...;
for (URL url: initialUrls) {
    executor.execute(new URLCrawler(this, url, latch));
}
// now wait for all crawling tasks to finish
latch.await();

您的 URLCrawler 将以这种方式使用闩锁:

public class URLCrawler implements Runnable {
    private final Latch latch;

    public URLCrawler(..., Latch l) {
        ...
        latch = l;
        latch.countUp(); // increment the count as early as possible
    }

    public void run() {
        try {
            List<URL> secondaryUrls = crawl();
            for (URL url: secondaryUrls) {
                // submit new tasks directly
                executor.execute(new URLCrawler(..., latch));
            }
        } finally {
            // as a last step, decrement the count
            latch.countDown();
        }
    }
}

至于闩锁实现,可以有多种可能的实现,包括基于 wait() 和 notificationAll() 的实现,以及使用 Lock 的实现和 Condition 到使用 AbstractQueuedSynchronizer 的实现。我认为所有这些实现都非常简单。请注意,wait()-notifyAll() 版本和 Lock-Condition 版本将基于互斥,而 AQS 版本将利用 CAS(比较和交换),因此在某些情况下可能会更好地扩展。

I think a basic building block for your use case is a "latch", similar to CountDownLatch, but unlike CountDownLatch, one that permits incrementing the count as well.

An interface for such a latch might be

public interface Latch {
    public void countDown();
    public void countUp();
    public void await() throws InterruptedException;
    public int getCount();
}

Legal values for counts would be 0 and up. The await() method would let you block until the count goes down to zero.

If you have such a latch, your use case can be described fairly easily. I also suspect the queue (frontier) can be eliminated in this solution (executor provides one anyway so it's somewhat redundant). I would rewrite your main routine as

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers);
Latch latch = ...; // instantiate a latch
URL[] initialUrls = ...;
for (URL url: initialUrls) {
    executor.execute(new URLCrawler(this, url, latch));
}
// now wait for all crawling tasks to finish
latch.await();

Your URLCrawler would use the latch in this manner:

public class URLCrawler implements Runnable {
    private final Latch latch;

    public URLCrawler(..., Latch l) {
        ...
        latch = l;
        latch.countUp(); // increment the count as early as possible
    }

    public void run() {
        try {
            List<URL> secondaryUrls = crawl();
            for (URL url: secondaryUrls) {
                // submit new tasks directly
                executor.execute(new URLCrawler(..., latch));
            }
        } finally {
            // as a last step, decrement the count
            latch.countDown();
        }
    }
}

As for the latch implementations, there can be a number of possible implementations, ranging from one that's based on wait() and notifyAll(), one that uses Lock and Condition, to an implementation that uses the AbstractQueuedSynchronizer. All of these implementations I think would be pretty straightforward. Note that the wait()-notifyAll() version and the Lock-Condition version would be based on mutual exclusion, whereas the AQS version would utilize CAS (compare-and-swap), and thus might scale better under certain situations.

万水千山粽是情ミ 2024-09-19 09:20:13

这个问题有点老了,但我想我已经找到了一些简单、有效的解决方案:

扩展 ThreadPoolExecutor 类,如下所示。新功能是保持活动任务计数(不幸的是,如果 getActiveCount() 不可靠)。如果 taskCount.get() == 0 并且没有更多排队任务,则意味着没有任何事情可做,执行器将关闭。你有你的退出标准。另外,如果您创建了执行程序,但未能提交任何任务,它也不会阻塞:

public class CrawlingThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger taskCount = new AtomicInteger();

    public CrawlingThreadPoolExecutor() {
        super(8, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {

        super.beforeExecute(t, r);
        taskCount.incrementAndGet();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        super.afterExecute(r, t);
        taskCount.decrementAndGet();
        if (getQueue().isEmpty() && taskCount.get() == 0) {
            shutdown();
        }
    }
}

您还需要做的一件事是以保留对 Executor 引用的方式实现您的 Runnable 您正在使用以便能够提交新任务。这是一个模拟:

public class MockFetcher implements Runnable {

    private final String url;
    private final Executor e;

    public MockFetcher(final Executor e, final String url) {
        this.e = e;
        this.url = url;
    }

    @Override
    public void run() {
        final List<String> newUrls = new ArrayList<>();
        // Parse doc and build url list, and then:
        for (final String newUrl : newUrls) {
            e.execute(new MockFetcher(this.e, newUrl));
        }
    }
}

The question is a bit old, but I think i have found some simple, working solution:

Extend the ThreadPoolExecutor class like below. The new functionality is keeping the active task count (unfortunately, provided getActiveCount() is unreliable). If taskCount.get() == 0 and there are no more queued tasks, it means that there is nothing to be done and executor shuts down. You have your exit criteria. Also, if you create your executor, but fail to submit any tasks, it won't block:

public class CrawlingThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger taskCount = new AtomicInteger();

    public CrawlingThreadPoolExecutor() {
        super(8, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {

        super.beforeExecute(t, r);
        taskCount.incrementAndGet();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        super.afterExecute(r, t);
        taskCount.decrementAndGet();
        if (getQueue().isEmpty() && taskCount.get() == 0) {
            shutdown();
        }
    }
}

One more thing you have to do is implement your Runnable in a way it keeps reference to Executor you are using in order to be able to submit new tasks. Here is a mock:

public class MockFetcher implements Runnable {

    private final String url;
    private final Executor e;

    public MockFetcher(final Executor e, final String url) {
        this.e = e;
        this.url = url;
    }

    @Override
    public void run() {
        final List<String> newUrls = new ArrayList<>();
        // Parse doc and build url list, and then:
        for (final String newUrl : newUrls) {
            e.execute(new MockFetcher(this.e, newUrl));
        }
    }
}
上课铃就是安魂曲 2024-09-19 09:20:13

我想推荐一个自适应执行器。根据特征值,您可以选择串行化或并行化线程来执行。在下面的示例中,PUID 是一个字符串/对象,我想用它来做出决定。您可以更改逻辑以适合您的代码。代码的某些部分被注释以允许进一步的实验。

类 AdaptiveExecutor 实现 Executor {
最终队列任务 = new LinkedBlockingQueue();
可运行活动;
//ExecutorService threadExecutor=Executors.newCachedThreadPool();
static ExecutorService threadExecutor=Executors.newFixedThreadPool(4);

AdaptiveExecutor() {
    System.out.println("Initial Queue Size=" + tasks.size());
}

public void execute(final Runnable r) {
    /* if immediate start is needed do either of below two
    new Thread(r).start();

    try {
        threadExecutor.execute(r);
    } catch(RejectedExecutionException rEE ) {
        System.out.println("Thread Rejected " + new Thread(r).getName());
    }

    */


    tasks.offer(r); // otherwise, queue them up
    scheduleNext(new Thread(r)); // and kick next thread either serial or parallel.
    /*
    tasks.offer(new Runnable() {
        public void run() {
            try {
                r.run();
            } finally {
                scheduleNext();
            }
        }
    });
    */
    if ((active == null)&& !tasks.isEmpty()) {
        active = tasks.poll();
        try {
            threadExecutor.submit(active);
        } catch (RejectedExecutionException rEE) {
            System.out.println("Thread Rejected " + new Thread(r).getName());
        }
    }

    /*
    if ((active == null)&& !tasks.isEmpty()) {
        scheduleNext();
    } else tasks.offer(r);
    */
    //tasks.offer(r);

    //System.out.println("Queue Size=" + tasks.size());

}

private void serialize(Thread th) {
    try {
        Thread activeThread = new Thread(active);

        th.wait(200);
        threadExecutor.submit(th);
    } catch (InterruptedException iEx) {

    }
    /*
    active=tasks.poll();
    System.out.println("active thread is " +  active.toString() );
    threadExecutor.execute(active);
    */
}

private void parallalize() {
    if(null!=active)
        threadExecutor.submit(active);
}

protected void scheduleNext(Thread r) {
    //System.out.println("scheduleNext called") ;
    if(false==compareKeys(r,new Thread(active)))
        parallalize();
    else serialize(r);
}

private boolean compareKeys(Thread r, Thread active) {
    // TODO: obtain names of threads. If they contain same PUID, serialize them.
    if(null==active)
        return true; // first thread should be serialized
    else return false;  //rest all go parallel, unless logic controlls it
}

}

I'd like to suggest an AdaptiveExecuter. Based on a characteristic value, you can choose to serialize or parallalize a thread for execution. In the sample below, PUID is a string/object that I wanted to use to make that decision. You can alter the logic to suit your code. Some portions of code are commented to allow further experiments.

class AdaptiveExecutor implements Executor {
final Queue tasks = new LinkedBlockingQueue();
Runnable active ;
//ExecutorService threadExecutor=Executors.newCachedThreadPool();
static ExecutorService threadExecutor=Executors.newFixedThreadPool(4);

AdaptiveExecutor() {
    System.out.println("Initial Queue Size=" + tasks.size());
}

public void execute(final Runnable r) {
    /* if immediate start is needed do either of below two
    new Thread(r).start();

    try {
        threadExecutor.execute(r);
    } catch(RejectedExecutionException rEE ) {
        System.out.println("Thread Rejected " + new Thread(r).getName());
    }

    */


    tasks.offer(r); // otherwise, queue them up
    scheduleNext(new Thread(r)); // and kick next thread either serial or parallel.
    /*
    tasks.offer(new Runnable() {
        public void run() {
            try {
                r.run();
            } finally {
                scheduleNext();
            }
        }
    });
    */
    if ((active == null)&& !tasks.isEmpty()) {
        active = tasks.poll();
        try {
            threadExecutor.submit(active);
        } catch (RejectedExecutionException rEE) {
            System.out.println("Thread Rejected " + new Thread(r).getName());
        }
    }

    /*
    if ((active == null)&& !tasks.isEmpty()) {
        scheduleNext();
    } else tasks.offer(r);
    */
    //tasks.offer(r);

    //System.out.println("Queue Size=" + tasks.size());

}

private void serialize(Thread th) {
    try {
        Thread activeThread = new Thread(active);

        th.wait(200);
        threadExecutor.submit(th);
    } catch (InterruptedException iEx) {

    }
    /*
    active=tasks.poll();
    System.out.println("active thread is " +  active.toString() );
    threadExecutor.execute(active);
    */
}

private void parallalize() {
    if(null!=active)
        threadExecutor.submit(active);
}

protected void scheduleNext(Thread r) {
    //System.out.println("scheduleNext called") ;
    if(false==compareKeys(r,new Thread(active)))
        parallalize();
    else serialize(r);
}

private boolean compareKeys(Thread r, Thread active) {
    // TODO: obtain names of threads. If they contain same PUID, serialize them.
    if(null==active)
        return true; // first thread should be serialized
    else return false;  //rest all go parallel, unless logic controlls it
}

}

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文