使用 lucene 改进多线程索引
我正在尝试使用多个线程在 Lucene 中构建索引。因此,我开始编码并编写了以下代码。首先,我找到文件,并为每个文件创建一个线程来索引它。之后,我加入线程并优化索引。它有效,但我不确定......我可以大规模信任它吗?有什么办法可以改善吗?
import java.io.File;
import java.io.FileFilter;
import java.io.FileReader;
import java.io.IOException;
import java.io.File;
import java.io.FileReader;
import java.io.BufferedReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Document;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.analysis.StopAnalyzer;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.apache.lucene.index.TermFreqVector;
public class mIndexer extends Thread {
private File ifile;
private static IndexWriter writer;
public mIndexer(File f) {
ifile = f.getAbsoluteFile();
}
public static void main(String args[]) throws Exception {
System.out.println("here...");
String indexDir;
String dataDir;
if (args.length != 2) {
dataDir = new String("/home/omid/Ranking/docs/");
indexDir = new String("/home/omid/Ranking/indexes/");
}
else {
dataDir = args[0];
indexDir = args[1];
}
long start = System.currentTimeMillis();
Directory dir = FSDirectory.open(new File(indexDir));
writer = new IndexWriter(dir,
new StopAnalyzer(Version.LUCENE_34, new File("/home/omid/Desktop/stopwords.txt")),
true,
IndexWriter.MaxFieldLength.UNLIMITED);
int numIndexed = 0;
try {
numIndexed = index(dataDir, new TextFilesFilter());
} finally {
long end = System.currentTimeMillis();
System.out.println("Indexing " + numIndexed + " files took " + (end - start) + " milliseconds");
writer.optimize();
System.out.println("Optimization took place in " + (System.currentTimeMillis() - end) + " milliseconds");
writer.close();
}
System.out.println("Enjoy your day/night");
}
public static int index(String dataDir, FileFilter filter) throws Exception {
File[] dires = new File(dataDir).listFiles();
for (File d: dires) {
if (d.isDirectory()) {
File[] files = new File(d.getAbsolutePath()).listFiles();
for (File f: files) {
if (!f.isDirectory() &&
!f.isHidden() &&
f.exists() &&
f.canRead() &&
(filter == null || filter.accept(f))) {
Thread t = new mIndexer(f);
t.start();
t.join();
}
}
}
}
return writer.numDocs();
}
private static class TextFilesFilter implements FileFilter {
public boolean accept(File path) {
return path.getName().toLowerCase().endsWith(".txt");
}
}
protected Document getDocument() throws Exception {
Document doc = new Document();
if (ifile.exists()) {
doc.add(new Field("contents", new FileReader(ifile), Field.TermVector.YES));
doc.add(new Field("path", ifile.getAbsolutePath(), Field.Store.YES, Field.Index.NOT_ANALYZED));
String cat = "WIR";
cat = ifile.getAbsolutePath().substring(0, ifile.getAbsolutePath().length()-ifile.getName().length()-1);
cat = cat.substring(cat.lastIndexOf('/')+1, cat.length());
//doc.add(new Field("category", cat.subSequence(0, cat.length()), Field.Store.YES));
//System.out.println(cat.subSequence(0, cat.length()));
}
return doc;
}
public void run() {
try {
System.out.println("Indexing " + ifile.getAbsolutePath());
Document doc = getDocument();
writer.addDocument(doc);
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
任何hep都被视为。
I am trying to build my indexes in Lucene with multiple threads. So, I started my coding and wrote the following code. First I find the files and for each file, I create a thread to index it. After that I join the threads and optimize the indexes. It works but I'm not sure... can I trust it in large scale? Is there any way to improve it?
import java.io.File;
import java.io.FileFilter;
import java.io.FileReader;
import java.io.IOException;
import java.io.File;
import java.io.FileReader;
import java.io.BufferedReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Document;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.analysis.StopAnalyzer;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.apache.lucene.index.TermFreqVector;
public class mIndexer extends Thread {
private File ifile;
private static IndexWriter writer;
public mIndexer(File f) {
ifile = f.getAbsoluteFile();
}
public static void main(String args[]) throws Exception {
System.out.println("here...");
String indexDir;
String dataDir;
if (args.length != 2) {
dataDir = new String("/home/omid/Ranking/docs/");
indexDir = new String("/home/omid/Ranking/indexes/");
}
else {
dataDir = args[0];
indexDir = args[1];
}
long start = System.currentTimeMillis();
Directory dir = FSDirectory.open(new File(indexDir));
writer = new IndexWriter(dir,
new StopAnalyzer(Version.LUCENE_34, new File("/home/omid/Desktop/stopwords.txt")),
true,
IndexWriter.MaxFieldLength.UNLIMITED);
int numIndexed = 0;
try {
numIndexed = index(dataDir, new TextFilesFilter());
} finally {
long end = System.currentTimeMillis();
System.out.println("Indexing " + numIndexed + " files took " + (end - start) + " milliseconds");
writer.optimize();
System.out.println("Optimization took place in " + (System.currentTimeMillis() - end) + " milliseconds");
writer.close();
}
System.out.println("Enjoy your day/night");
}
public static int index(String dataDir, FileFilter filter) throws Exception {
File[] dires = new File(dataDir).listFiles();
for (File d: dires) {
if (d.isDirectory()) {
File[] files = new File(d.getAbsolutePath()).listFiles();
for (File f: files) {
if (!f.isDirectory() &&
!f.isHidden() &&
f.exists() &&
f.canRead() &&
(filter == null || filter.accept(f))) {
Thread t = new mIndexer(f);
t.start();
t.join();
}
}
}
}
return writer.numDocs();
}
private static class TextFilesFilter implements FileFilter {
public boolean accept(File path) {
return path.getName().toLowerCase().endsWith(".txt");
}
}
protected Document getDocument() throws Exception {
Document doc = new Document();
if (ifile.exists()) {
doc.add(new Field("contents", new FileReader(ifile), Field.TermVector.YES));
doc.add(new Field("path", ifile.getAbsolutePath(), Field.Store.YES, Field.Index.NOT_ANALYZED));
String cat = "WIR";
cat = ifile.getAbsolutePath().substring(0, ifile.getAbsolutePath().length()-ifile.getName().length()-1);
cat = cat.substring(cat.lastIndexOf('/')+1, cat.length());
//doc.add(new Field("category", cat.subSequence(0, cat.length()), Field.Store.YES));
//System.out.println(cat.subSequence(0, cat.length()));
}
return doc;
}
public void run() {
try {
System.out.println("Indexing " + ifile.getAbsolutePath());
Document doc = getDocument();
writer.addDocument(doc);
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
Any hep is regarded.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
如果您想并行化索引,您可以做两件事:
您走在并行调用 addDocuments 的正确道路上,但为每个文档生成一个线程不会随着需要索引的文档数量的增长而扩展。您应该使用固定大小的 ThreadPoolExecutor。由于此任务主要是 CPU 密集型任务(取决于您的分析器和检索数据的方式),因此将计算机的 CPU 数量设置为最大线程数可能是一个好的开始。
关于合并调度程序,您可以增加可与 ConcurrentMergeScheduler 的 setMaxThreadCount 方法。请注意,磁盘在顺序读/写方面比随机读/写要好得多,因此,为合并调度程序设置过高的最大线程数更有可能减慢索引而不是加快索引速度。
但在尝试并行化索引过程之前,您可能应该尝试找出瓶颈所在。如果您的磁盘太慢,瓶颈可能是刷新和合并步骤,因此并行调用 addDocument (主要是分析文档并将分析结果缓冲在内存中)不会提高索引速度根本不。
一些旁注:
Lucene 的开发版本中正在进行一些工作,以提高索引并行性(特别是刷新部分,博客条目 解释了它是如何工作的)。
Lucene 有一个很好的 wiki 页面,介绍如何提高索引速度,您可以在其中找到其他提高索引速度的方法。
If you want to parallelize indexing, there are two things you can do:
You are on the right path to parallelize calls to addDocuments, but spawning one thread per document will not scale as the number of documents you need to index will grow. You should rather use a fixed-size ThreadPoolExecutor. Since this task is mainly CPU-intensive (depending on your analyzer and the way you retrieve your data), setting the number of CPUs of your computer as the maximum number of threads might be a good start.
Regarding the merge scheduler, you can increase the maximum number of threads which can be used with the setMaxThreadCount method of ConcurrentMergeScheduler. Beware that disks are much better at sequential reads/writes than random read/writes, as a consequence setting a too high maximum number of threads to your merge scheduler is more likely to slow indexing down than to speed it up.
But before trying to parallelizing your indexing process, you should probably try to find where the bottleneck is. If your disk is too slow, the bottleneck is likely to be the flush and the merge steps, as a consequence parallelizing calls to addDocument (which essentially consists in analyzing a document and buffering the result of the analysis in memory) will not improve indexing speed at all.
Some side notes:
There is some ongoing work in the development version of Lucene in order to improve indexing parallelism (the flushing part especially, this blog entry explains how it works).
Lucene has a nice wiki page on how to improve indexing speed where you will find other ways to improve indexing speed.
我认为更现代的方法是使用 ThreadPoolExecutor 并提交 Runnable 正在执行索引。您可以使用 .awaitTermination 或 CountdownLatch 等待所有线程终止。
我不太喜欢让主类扩展 Thread,只需创建一个可运行的内部类,在构造函数中获取其依赖项即可。这使您的代码更具可读性,因为线程正在执行的工作与您的应用程序设置代码明显分开。
关于风格的一些注释,我不太喜欢让主类抛出异常,这通常只是意味着您不清楚您正在使用的代码可能抛出的不同检查异常情况。通常,除非有非常具体的原因,否则这样做不是正确的做法。
I think the more modern way to do this is use a ThreadPoolExecutor and submit a Runnable that's doing your indexing. You can wait for all threads to terminate using .awaitTermination, or a CountdownLatch.
I'm not a big fan of having your main class extend Thread, just create a runnable inner class that takes its depdencies in a constructor. This makes your code more readable, as the work the threads are doing are clearly separated from your application setup code.
A few notes on style, I'm not a big fan of having your main class throw Exception, this usually just means you don't have a clear idea of the different checked exception cases the code you are using can throw. Usually it's not the right thing to be doing unless you have a very specific reason.