使用 Java 并行/多线程编写现有代码

发布于 2024-11-07 22:45:31 字数 182 浏览 4 评论 0原文

我有一个非常简单的爬虫。我想让我当前的代码在几个线程中运行。您能为我提供一些教程或文章来帮助我完成此测试吗?

我最初是一名 .Net 开发人员,在 .Net 中,我在多线程中运行代码没有任何问题,但不幸的是我对 Java 中的线程一无所知。

我的爬虫是一个命令行软件,所以不用担心 GUI。

先感谢您。

I have a very simple crawler. I want to make my current code run in a few threads. Could you provide me a little tutorial or article to help me achive this test?

I'm originally a .Net developer and in .Net I have no problem whatsoever running codes in multithread but unfortunately I don't know anything about threads in Java.

My crawler is a command-line software so don't worry about GUI.

Thank you in advance.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

沧笙踏歌 2024-11-14 22:45:31

Java通过Thread类来实现多线程。使现有代码多线程化的最常见方法之一是使用 Runnable 接口来定义要在线程启动时调用的内容,然后启动它。

public class SomeFunctions
{
  public static void FunctionA() {}
  public static void FunctionB() {}
  public static void FunctionC() {}
}

// ...
Thread t1 = new Thread(new Runnable() {
   public void run() {
      SomeFunctions.FunctionA();
   }
});
t1.start();

// (rinse and repeat for the other functions)

干编码,但它至少应该传达一般概念。当然,一旦进入多线程领域,就会遇到并发问题,需要确保所有内容都适当同步等,但任何语言都会遇到这些问题。

如果您担心同步问题,可以使用一些工具。最简单的是 Java 中内置的递归互斥功能,即“synchronized”关键字。还可以通过 java.util.concurrent 和 java.util.concurrent.locks 包中的各种类获得更经典的方法,例如 Semaphore 和 ReadWriteLock

http://download.oracle.com/javase/6/docs/api/java/util/concurrent/package-summary.html
http://download.oracle.com/javase/6/docs/api/java/util/concurrent/locks/package-summary.html

Java does multithreading through the Thread class. One of the most common ways to make existing code multithreaded is to use the Runnable interface to define what you want to call at thread start, and then start it off.

public class SomeFunctions
{
  public static void FunctionA() {}
  public static void FunctionB() {}
  public static void FunctionC() {}
}

// ...
Thread t1 = new Thread(new Runnable() {
   public void run() {
      SomeFunctions.FunctionA();
   }
});
t1.start();

// (rinse and repeat for the other functions)

Dry coded, but it should at least get the general concept across. Of course, as soon as you go into multithreading land, you have concurrency issues and need to make sure everything is appropriately syhchronized, etc., but any language will have those issues.

If you're worried about synchronization, you have a few tools at your disposal. The easiest is the recursive mutex functionality built into Java, the "synchronized" keyword. More classical means are also available through various classes in the java.util.concurrent and java.util.concurrent.locks packages such as Semaphore and ReadWriteLock

http://download.oracle.com/javase/6/docs/api/java/util/concurrent/package-summary.html
http://download.oracle.com/javase/6/docs/api/java/util/concurrent/locks/package-summary.html

榆西 2024-11-14 22:45:31

你可以看一下我的网络爬虫示例。抱歉冗长。

import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
 * A web crawler with a Worker pool
 * 
 * @author Adriaan
 */
public class WebCrawler implements Manager {

        private Set<Worker> workers = new HashSet<Worker>();
        private List<String> toCrawl = new ArrayList<String>();
        private Set<String> crawled = new HashSet<String>();
        private Set<String> hosts = new HashSet<String>();
        private Set<String> results = new HashSet<String>();
        private int maxResults;

        public WebCrawler(String url, int numberOfWorkers, int maxResults) {
                this.maxResults = maxResults;
                toCrawl.add(url);
                createWorkers(numberOfWorkers);
        }

        public void createWorkers(int numberOfWorkers) {
                for (int i = 0; i < numberOfWorkers; i++) {
                        workers.add(new Worker(this));
                }
        }

        private void stopWorkers() {
                for (Worker worker : workers) {
                        worker.terminate();
                }
        }

        public synchronized Job getNewJob() {
                while (toCrawl.size() == 0) {
                        try {
                                wait();
                        } catch (InterruptedException e) {
                                // ignore
                        }
                }
                return new EmailAddressCrawlJob().setDescription(toCrawl.remove(0));
        }

        public synchronized void jobCompleted(Job job) {
                // System.out.println("crawled: " + job.getDescription());
                crawled.add(job.getDescription());
                String host = getHost(job.getDescription());
                boolean knownHost = hosts.contains(host);
                if (!knownHost) {
                        System.out.println("host: " + host);
                        hosts.add(host);
                }
                for (String url : job.getNewDescriptions()) {
                        if (!crawled.contains(url)) {
                                if (knownHost) {
                                        toCrawl.add(toCrawl.size() - 1, url);
                                } else {
                                        toCrawl.add(url);
                                }
                        }
                }
                for (String result : job.getResults()) {
                        if (results.add(result)) {
                                System.out.println("result: " + result);
                        }
                }
                notifyAll();
                if (results.size() >= maxResults) {
                        stopWorkers();
                        System.out.println("Crawled hosts:");
                        for (String crawledHost : hosts) {
                                System.out.println(crawledHost);
                        }
                        Set<String> uncrawledHosts = new HashSet<String>();
                        for (String toCrawlUrl : toCrawl) {
                                uncrawledHosts.add(getHost(toCrawlUrl));
                        }
                        System.out.println("Uncrawled hosts:");
                        for (String unCrawledHost : uncrawledHosts) {
                                System.out.println(unCrawledHost);
                        }
                }
                if (crawled.size() % 10 == 0) {
                        System.out.println("crawled=" + crawled.size() + " toCrawl="
                                        + toCrawl.size() + " results=" + results.size() + " hosts="
                                        + hosts.size() + " lastHost=" + host);
                }
        }

        public String getHost(String host) {
                int hostStart = host.indexOf("://") + 3;
                if (hostStart > 0) {
                        int hostEnd = host.indexOf("/", hostStart);
                        if (hostEnd < 0) {
                                hostEnd = host.length();
                        }
                        host = host.substring(hostStart, hostEnd);
                }
                return host;
        }

        public static void main(String[] args) throws MalformedURLException {
                new WebCrawler("http://www.nu.nl/", 5, 20);
        }
}

Worker

**
 * A Worker proactively gets a Job, executes it and notifies its manager that
 * the Job is completed.
 * 
 * @author Adriaan
 */
public class Worker extends Thread {

        private final Manager manager;
        private Job job = null;
        private boolean isWorking;

        public Worker(Manager manager) {
                this.manager = manager;
                isWorking = true;
                start();
        }

        @Override
        public void run() {
                System.out.println("Worker " + Thread.currentThread().getId()
                                + " starting ");
                while (isWorking) {
                        job = manager.getNewJob();
                        job.execute();
                        manager.jobCompleted(job);
                }
        }

        public void terminate() {
                isWorking = false;
        }
}

Manager 界面

/**
 * Manager interface for Workers
 * 
 * @author Adriaan
 */
public interface Manager {

        /**
         * Gets a new job
         * 
         * @return
         */
        public Job getNewJob();

        /**
         * Indicates the job is completed
         * 
         * @param job
         */
        public void jobCompleted(Job job);
}

Job

import java.util.HashSet;
import java.util.Set;

/**
 * A Job is a unit of work defined by a String (the description). During execution the 
 * job can obtain results and new job descriptions.
 *
 * @author Adriaan
 */
public abstract class Job {

        private String description;
        private Set<String> results = new HashSet<String>();
        private Set<String> newDescriptions = new HashSet<String>();

        /**
         * Sets the job description
         * 
         * @param description
         * @return this for chaining
         */
        public Job setDescription(String description) {
                this.description = description;
                return this;
        }

        /**
         * Executes the job
         */
        public abstract void execute();

        /**
         * Gets the results obtained
         * 
         * @return
         */
        public Set<String> getResults() {
                return results;
        }

        /**
         * Gets the now job descriptions obtained
         * 
         * @return
         */
        public Set<String> getNewDescriptions() {
                return newDescriptions;
        }

        /**
         * Gets the job description
         * 
         * @return
         */
        public String getDescription() {
                return description;
        }

        /**
         * Allows the implementation to add an obtained result
         * 
         * @param result
         */
        void addResult(String result) {
                results.add(result);
        }

        /**
         * Allows the implementation to add an obtained description
         * 
         * @param result
         */
        void addNewDescription(String newDescription) {
                newDescriptions.add(newDescription);
        }
}

一项抓取电子邮件地址页面的作业:

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * A Job which crawls HTTP or HTTPS URL's for email adresses, collecting new
 * URL's to crawl along the way.
 * 
 * @author Adriaan
 */
public class EmailAddressCrawlJob extends Job {

        @Override
        public void execute() {
                try {
                        URL url = new URL(getDescription());
                        if (url != null) {
                                String text = readText(url);
                                extractNewDescriptions(text, url);
                                extractResults(text);
                        }
                } catch (MalformedURLException e) {
                        System.err.println("Bad url " + getDescription());
                }
        }

        private String readText(URL url) {
                URLConnection connection;
                try {
                        connection = url.openConnection();
                        InputStream input = connection.getInputStream();
                        byte[] buffer = new byte[1000];
                        int num = input.read(buffer);
                        if (num > 0) {
                                StringBuilder builder = new StringBuilder();
                                builder.append(new String(buffer, 0, num));
                                while (num != -1) {
                                        num = input.read(buffer);
                                        if (num != -1) {
                                                builder.append(new String(buffer, 0, num));
                                        }
                                }
                                return builder.toString();
                        }
                } catch (IOException e) {
                        //System.err.println("Could not read from " + url);
                }
                return "";
        }

        private void extractNewDescriptions(String text, URL url) {

                // URL extracting code from Sun example
                String lowerCaseContent = text.toLowerCase();
                int index = 0;
                while ((index = lowerCaseContent.indexOf("<a", index)) != -1) {

                        if ((index = lowerCaseContent.indexOf("href", index)) == -1) {
                                break;
                        }

                        if ((index = lowerCaseContent.indexOf("=", index)) == -1) {
                                break;
                        }

                        index++;
                        String remaining = text.substring(index);
                        StringTokenizer st = new StringTokenizer(remaining, "\t\n\r\">#");
                        String strLink = st.nextToken();

                        if (strLink.startsWith("javascript:")) {
                                continue;
                        }

                        URL urlLink;
                        try {
                                urlLink = new URL(url, strLink);
                                strLink = urlLink.toString();
                        } catch (MalformedURLException e) {
                                // System.err.println("Could not create url: " + target
                                // + " + " + strLink);
                                continue;
                        }
                        // only look at http links
                        String protocol = urlLink.getProtocol();
                        if (protocol.compareTo("http") != 0
                                        && protocol.compareTo("https") != 0) {
                                // System.err.println("Ignoring: " + protocol
                                // + " protocol in " + urlLink);
                                continue;
                        }
                        addNewDescription(urlLink.toString());
                }
        }

        private void extractResults(String text) {
                Pattern p = Pattern
                                .compile("([\\w\\-]([\\.\\w])+[\\w]+@([\\w\\-]+\\.)+[A-Za-z]{2,4})");
                Matcher m = p.matcher(text);
                while (m.find()) {
                        addResult(m.group(1));
                }
        }
}

我知道这个答案有点冗长,但我认为 OP 可能最好通过一个工作示例来帮助,而我不久前碰巧做了一个。

You can take a look at my webcrawler example. Sry for the lengthiness.

import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
 * A web crawler with a Worker pool
 * 
 * @author Adriaan
 */
public class WebCrawler implements Manager {

        private Set<Worker> workers = new HashSet<Worker>();
        private List<String> toCrawl = new ArrayList<String>();
        private Set<String> crawled = new HashSet<String>();
        private Set<String> hosts = new HashSet<String>();
        private Set<String> results = new HashSet<String>();
        private int maxResults;

        public WebCrawler(String url, int numberOfWorkers, int maxResults) {
                this.maxResults = maxResults;
                toCrawl.add(url);
                createWorkers(numberOfWorkers);
        }

        public void createWorkers(int numberOfWorkers) {
                for (int i = 0; i < numberOfWorkers; i++) {
                        workers.add(new Worker(this));
                }
        }

        private void stopWorkers() {
                for (Worker worker : workers) {
                        worker.terminate();
                }
        }

        public synchronized Job getNewJob() {
                while (toCrawl.size() == 0) {
                        try {
                                wait();
                        } catch (InterruptedException e) {
                                // ignore
                        }
                }
                return new EmailAddressCrawlJob().setDescription(toCrawl.remove(0));
        }

        public synchronized void jobCompleted(Job job) {
                // System.out.println("crawled: " + job.getDescription());
                crawled.add(job.getDescription());
                String host = getHost(job.getDescription());
                boolean knownHost = hosts.contains(host);
                if (!knownHost) {
                        System.out.println("host: " + host);
                        hosts.add(host);
                }
                for (String url : job.getNewDescriptions()) {
                        if (!crawled.contains(url)) {
                                if (knownHost) {
                                        toCrawl.add(toCrawl.size() - 1, url);
                                } else {
                                        toCrawl.add(url);
                                }
                        }
                }
                for (String result : job.getResults()) {
                        if (results.add(result)) {
                                System.out.println("result: " + result);
                        }
                }
                notifyAll();
                if (results.size() >= maxResults) {
                        stopWorkers();
                        System.out.println("Crawled hosts:");
                        for (String crawledHost : hosts) {
                                System.out.println(crawledHost);
                        }
                        Set<String> uncrawledHosts = new HashSet<String>();
                        for (String toCrawlUrl : toCrawl) {
                                uncrawledHosts.add(getHost(toCrawlUrl));
                        }
                        System.out.println("Uncrawled hosts:");
                        for (String unCrawledHost : uncrawledHosts) {
                                System.out.println(unCrawledHost);
                        }
                }
                if (crawled.size() % 10 == 0) {
                        System.out.println("crawled=" + crawled.size() + " toCrawl="
                                        + toCrawl.size() + " results=" + results.size() + " hosts="
                                        + hosts.size() + " lastHost=" + host);
                }
        }

        public String getHost(String host) {
                int hostStart = host.indexOf("://") + 3;
                if (hostStart > 0) {
                        int hostEnd = host.indexOf("/", hostStart);
                        if (hostEnd < 0) {
                                hostEnd = host.length();
                        }
                        host = host.substring(hostStart, hostEnd);
                }
                return host;
        }

        public static void main(String[] args) throws MalformedURLException {
                new WebCrawler("http://www.nu.nl/", 5, 20);
        }
}

Worker

**
 * A Worker proactively gets a Job, executes it and notifies its manager that
 * the Job is completed.
 * 
 * @author Adriaan
 */
public class Worker extends Thread {

        private final Manager manager;
        private Job job = null;
        private boolean isWorking;

        public Worker(Manager manager) {
                this.manager = manager;
                isWorking = true;
                start();
        }

        @Override
        public void run() {
                System.out.println("Worker " + Thread.currentThread().getId()
                                + " starting ");
                while (isWorking) {
                        job = manager.getNewJob();
                        job.execute();
                        manager.jobCompleted(job);
                }
        }

        public void terminate() {
                isWorking = false;
        }
}

Manager interface

/**
 * Manager interface for Workers
 * 
 * @author Adriaan
 */
public interface Manager {

        /**
         * Gets a new job
         * 
         * @return
         */
        public Job getNewJob();

        /**
         * Indicates the job is completed
         * 
         * @param job
         */
        public void jobCompleted(Job job);
}

Job

import java.util.HashSet;
import java.util.Set;

/**
 * A Job is a unit of work defined by a String (the description). During execution the 
 * job can obtain results and new job descriptions.
 *
 * @author Adriaan
 */
public abstract class Job {

        private String description;
        private Set<String> results = new HashSet<String>();
        private Set<String> newDescriptions = new HashSet<String>();

        /**
         * Sets the job description
         * 
         * @param description
         * @return this for chaining
         */
        public Job setDescription(String description) {
                this.description = description;
                return this;
        }

        /**
         * Executes the job
         */
        public abstract void execute();

        /**
         * Gets the results obtained
         * 
         * @return
         */
        public Set<String> getResults() {
                return results;
        }

        /**
         * Gets the now job descriptions obtained
         * 
         * @return
         */
        public Set<String> getNewDescriptions() {
                return newDescriptions;
        }

        /**
         * Gets the job description
         * 
         * @return
         */
        public String getDescription() {
                return description;
        }

        /**
         * Allows the implementation to add an obtained result
         * 
         * @param result
         */
        void addResult(String result) {
                results.add(result);
        }

        /**
         * Allows the implementation to add an obtained description
         * 
         * @param result
         */
        void addNewDescription(String newDescription) {
                newDescriptions.add(newDescription);
        }
}

A Job which crawls a page for email addresses:

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * A Job which crawls HTTP or HTTPS URL's for email adresses, collecting new
 * URL's to crawl along the way.
 * 
 * @author Adriaan
 */
public class EmailAddressCrawlJob extends Job {

        @Override
        public void execute() {
                try {
                        URL url = new URL(getDescription());
                        if (url != null) {
                                String text = readText(url);
                                extractNewDescriptions(text, url);
                                extractResults(text);
                        }
                } catch (MalformedURLException e) {
                        System.err.println("Bad url " + getDescription());
                }
        }

        private String readText(URL url) {
                URLConnection connection;
                try {
                        connection = url.openConnection();
                        InputStream input = connection.getInputStream();
                        byte[] buffer = new byte[1000];
                        int num = input.read(buffer);
                        if (num > 0) {
                                StringBuilder builder = new StringBuilder();
                                builder.append(new String(buffer, 0, num));
                                while (num != -1) {
                                        num = input.read(buffer);
                                        if (num != -1) {
                                                builder.append(new String(buffer, 0, num));
                                        }
                                }
                                return builder.toString();
                        }
                } catch (IOException e) {
                        //System.err.println("Could not read from " + url);
                }
                return "";
        }

        private void extractNewDescriptions(String text, URL url) {

                // URL extracting code from Sun example
                String lowerCaseContent = text.toLowerCase();
                int index = 0;
                while ((index = lowerCaseContent.indexOf("<a", index)) != -1) {

                        if ((index = lowerCaseContent.indexOf("href", index)) == -1) {
                                break;
                        }

                        if ((index = lowerCaseContent.indexOf("=", index)) == -1) {
                                break;
                        }

                        index++;
                        String remaining = text.substring(index);
                        StringTokenizer st = new StringTokenizer(remaining, "\t\n\r\">#");
                        String strLink = st.nextToken();

                        if (strLink.startsWith("javascript:")) {
                                continue;
                        }

                        URL urlLink;
                        try {
                                urlLink = new URL(url, strLink);
                                strLink = urlLink.toString();
                        } catch (MalformedURLException e) {
                                // System.err.println("Could not create url: " + target
                                // + " + " + strLink);
                                continue;
                        }
                        // only look at http links
                        String protocol = urlLink.getProtocol();
                        if (protocol.compareTo("http") != 0
                                        && protocol.compareTo("https") != 0) {
                                // System.err.println("Ignoring: " + protocol
                                // + " protocol in " + urlLink);
                                continue;
                        }
                        addNewDescription(urlLink.toString());
                }
        }

        private void extractResults(String text) {
                Pattern p = Pattern
                                .compile("([\\w\\-]([\\.\\w])+[\\w]+@([\\w\\-]+\\.)+[A-Za-z]{2,4})");
                Matcher m = p.matcher(text);
                while (m.find()) {
                        addResult(m.group(1));
                }
        }
}

I know this answer is a bit verbose, but I thought OP might be best helped with a working example and I happened to have made one not so long ago.

ゞ记忆︶ㄣ 2024-11-14 22:45:31

一个非常基本的 java 程序,它将给出多线程的抽象概念..

public class MyThread extends Thread {

  String word;

  public MyThread(String rm){
    word = rm;
  }

  public void run(){

    try {

      for(;;){
        System.out.println(word);
        Thread.sleep(1000);
      }

    } catch(InterruptedException e) {

      System.out.println("sleep interrupted");      
    }
  }

  public static void main(String[] args) {

    Thread t1=new MyThread("First Thread");
    Thread t2=new MyThread("Second Thread");
    t1.start();
    t2.start();
  }
} 

输出将是..

First Thread
Second Thread
First Thread
Second Thread
First Thread

阅读此 PPT 它将帮助您了解基础知识..

此处

A very basic java program that will give the abstract idea of the Multi Threading..

public class MyThread extends Thread {

  String word;

  public MyThread(String rm){
    word = rm;
  }

  public void run(){

    try {

      for(;;){
        System.out.println(word);
        Thread.sleep(1000);
      }

    } catch(InterruptedException e) {

      System.out.println("sleep interrupted");      
    }
  }

  public static void main(String[] args) {

    Thread t1=new MyThread("First Thread");
    Thread t2=new MyThread("Second Thread");
    t1.start();
    t2.start();
  }
} 

And the Output will be..

First Thread
Second Thread
First Thread
Second Thread
First Thread

Go with this PPT it will help you with the basics..

Here

岁月苍老的讽刺 2024-11-14 22:45:31

使用Runnable而不是扩展Thread:

与其扩展Thread类,不如实现Runnable接口,它将运行线程的任务与线程本身分开。通过实现 Runnable 接口,将在传递给构造函数的对象上调用 run() 方法,我们可以使用单个 Thread 对象来执行多个 Runnable 对象。

对睡眠时间使用常量

不要对睡眠时间(1000 毫秒)进行硬编码,而是使用常量使其更具可读性且易于更改。例如,您可以声明一个 private static final int SLEEP_TIME = 1000;在类的开头并在 sleep() 调用中使用它。

public class MyRunnable implements Runnable {
    private String word;

    public MyRunnable(String word) {
        this.word = word;
    }

    @Override
    public void run() {
        try {
            while (true) {
                System.out.println(word);
                Thread.sleep(SLEEP_TIME);
            }
        } catch (InterruptedException e) {
            System.out.println("sleep interrupted");
        }
    }

    private static final int SLEEP_TIME = 1000;

    public static void main(String[] args) {
        Thread t1 = new Thread(new MyRunnable("First Thread"));
        Thread t2 = new Thread(new MyRunnable("Second Thread"));
        t1.start();
        t2.start();
    }
}

Use Runnable instead of extending Thread:

Instead of extending the Thread class, it is better to implement the Runnable interface, which separates the task of running a thread from the thread itself. By implementing the Runnable interface, the run() method will be invoked on the object passed to the constructor, and we can use a single Thread object to execute multiple Runnable objects.

Use a constant for the sleep time:

Instead of hard-coding the sleep time (1000 milliseconds), use a constant to make it more readable and easy to change. For example, you could declare a private static final int SLEEP_TIME = 1000; at the beginning of the class and use it in the sleep() call.

public class MyRunnable implements Runnable {
    private String word;

    public MyRunnable(String word) {
        this.word = word;
    }

    @Override
    public void run() {
        try {
            while (true) {
                System.out.println(word);
                Thread.sleep(SLEEP_TIME);
            }
        } catch (InterruptedException e) {
            System.out.println("sleep interrupted");
        }
    }

    private static final int SLEEP_TIME = 1000;

    public static void main(String[] args) {
        Thread t1 = new Thread(new MyRunnable("First Thread"));
        Thread t2 = new Thread(new MyRunnable("Second Thread"));
        t1.start();
        t2.start();
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文