Elasticsearch 批量导入 (Java 代码)
Elasticsearch 索引
PUT /movies
{
"mappings": {
"properties": {
"id": {
"type": "integer"
},
"title": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"year": {
"type": "integer"
},
"genre": {
"type": "text"
}
}
}
}
PUT /ratings
{
"mappings": {
"properties": {
"user_id": {
"type": "integer"
},
"movie_id": {
"type": "integer"
},
"movie_title": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"rating": {
"type": "float"
},
"timestampe": {
"type": "date"
}
}
}
}
PUT /tags
{
"mappings": {
"properties": {
"user_id": {
"type": "integer"
},
"movie_id": {
"type": "integer"
},
"movie_title": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"tag": {
"type": "text"
},
"timestamp": {
"type": "date"
}
}
}
}
项目目录结构
.
├── elasticsearch.iml
├── pom.xml
└── src
├── main
│ ├── java
│ │ └── net
│ │ └── wuxianjie
│ │ └── elasticsearch
│ │ ├── ImportMovies.java
│ │ ├── ImportRatings.java
│ │ ├── ImportTags.java
│ │ ├── config
│ │ │ └── Initialization.java
│ │ └── util
│ │ ├── EsUtils.java
│ │ └── FileUtils.java
│ └── resources
│ └── ml-latest-small
│ ├── README.txt
│ ├── links.csv
│ ├── movies.csv
│ ├── ratings.csv
│ └── tags.csv
└── test
└── java
Maven 依赖
<dependencies> <dependency> <groupId>com.opencsv</groupId> <artifactId>opencsv</artifactId> <version>5.4</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>${elasticsearch.version}</version> </dependency> </dependencies>
Java 代码
Java High Level REST Client 初始化类
package net.wuxianjie.elasticsearch.config; import java.io.IOException; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; public class Initialization { private final RestHighLevelClient client; private Initialization() { client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http"))); } private static class SingletonHelper { private static final Initialization INSTANCE = new Initialization(); } public static Initialization getInstance() { return SingletonHelper.INSTANCE; } public RestHighLevelClient getClient() { return this.client; } public void close() throws IOException { this.client.close(); } }
工具类
package net.wuxianjie.elasticsearch.util; import java.io.File; import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Paths; import net.wuxianjie.elasticsearch.ImportMovies; public class FileUtils { public static String getFilePath(String path) throws URISyntaxException { if (path == null || path.isBlank()) { throw new IllegalArgumentException("文件路径不能为空"); } URL res = ImportMovies.class.getClassLoader().getResource(path); if (res == null) { throw new IllegalArgumentException("文件路径错误"); } File file = Paths.get(res.toURI()).toFile(); return file.getAbsolutePath(); } }
package net.wuxianjie.elasticsearch.util; import java.io.IOException; import net.wuxianjie.elasticsearch.config.Initialization; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; public class EsUtils { public static void bulkIndex(BulkRequest request) throws IOException { try (RestHighLevelClient client = Initialization.getInstance().getClient()) { BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); if (bulkResponse.hasFailures()) { for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); throw new RuntimeException(failure.getMessage(), failure.getCause()); } } } } } }
导入类
package net.wuxianjie.elasticsearch; import com.opencsv.CSVReaderHeaderAware; import com.opencsv.exceptions.CsvValidationException; import java.io.FileReader; import java.io.IOException; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import net.wuxianjie.elasticsearch.util.FileUtils; import net.wuxianjie.elasticsearch.util.EsUtils; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.xcontent.XContentType; public class ImportMovies { public static final String PATH_TO_CSV = "ml-latest-small/movies.csv"; public static final String INDEX_NAME = "movies"; public static Map<String, String> getMovieMap() throws URISyntaxException, IOException, CsvValidationException { Map<String, String> result = new HashMap<>(); try (CSVReaderHeaderAware reader = new CSVReaderHeaderAware(new FileReader(FileUtils.getFilePath(PATH_TO_CSV)))) { Map<String, String> row; while ((row = reader.readMap()) != null) { String titleAndYear = row.get("title"); Map<String, String> titleMap = parseTitle(titleAndYear); result.put(row.get("movieId"), titleMap.get("title")); } } return result; } public static void main(String[] args) throws IOException, URISyntaxException, CsvValidationException { EsUtils.bulkIndex(readCsv()); } private static BulkRequest readCsv() throws IOException, CsvValidationException, URISyntaxException { BulkRequest request = new BulkRequest(); CSVReaderHeaderAware reader = new CSVReaderHeaderAware(new FileReader(FileUtils.getFilePath(PATH_TO_CSV))); Map<String, String> row; while ((row = reader.readMap()) != null) { String movieId = row.get("movieId"); String titleAndYear = row.get("title"); Map<String, String> titleMap = parseTitle(titleAndYear); request.add(new IndexRequest(INDEX_NAME) .id(movieId) .source(XContentType.JSON, "id", movieId, "title", titleMap.get("title"), "year", titleMap.get("year"), "genre", row.get("genres"))); } return request; } private static Map<String, String> parseTitle(String str) { int beginIndexOfYear = str.lastIndexOf(" ("); if (beginIndexOfYear == -1) { return new HashMap<>() {{ put("title", str); put("year", null); }}; } return new HashMap<>() {{ put("title", str.substring(0, beginIndexOfYear)); put("year", str.substring(beginIndexOfYear + 2, str.length() - 1)); }}; } }
package net.wuxianjie.elasticsearch; import com.opencsv.CSVReaderHeaderAware; import com.opencsv.exceptions.CsvValidationException; import java.io.FileReader; import java.io.IOException; import java.net.URISyntaxException; import java.util.Map; import net.wuxianjie.elasticsearch.util.FileUtils; import net.wuxianjie.elasticsearch.util.EsUtils; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.xcontent.XContentType; public class ImportRatings { public static final String PATH_TO_CSV = "ml-latest-small/ratings.csv"; public static final String INDEX_NAME = "ratings"; public static void main(String[] args) throws CsvValidationException, IOException, URISyntaxException { EsUtils.bulkIndex(readCsv()); } private static BulkRequest readCsv() throws IOException, CsvValidationException, URISyntaxException { Map<String, String> movieMap = ImportMovies.getMovieMap(); BulkRequest request = new BulkRequest(); try (CSVReaderHeaderAware reader = new CSVReaderHeaderAware(new FileReader(FileUtils.getFilePath(PATH_TO_CSV)))) { Map<String, String> row; while ((row = reader.readMap()) != null) { String movieId = row.get("movieId"); request.add(new IndexRequest(INDEX_NAME) .source(XContentType.JSON, "user_id", row.get("userId"), "movie_id", movieId, "movie_title", movieMap.get(movieId), "rating", row.get("rating"), "timestamp", row.get("timestamp"))); } } return request; } }
package net.wuxianjie.elasticsearch; import com.opencsv.CSVReaderHeaderAware; import com.opencsv.exceptions.CsvValidationException; import java.io.FileReader; import java.io.IOException; import java.net.URISyntaxException; import java.util.Map; import net.wuxianjie.elasticsearch.util.FileUtils; import net.wuxianjie.elasticsearch.util.EsUtils; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.xcontent.XContentType; public class ImportTags { public static final String PATH_TO_CSV = "ml-latest-small/tags.csv"; public static final String INDEX_NAME = "tags"; public static void main(String[] args) throws IOException, URISyntaxException, CsvValidationException { EsUtils.bulkIndex(readCsv()); } public static BulkRequest readCsv() throws IOException, CsvValidationException, URISyntaxException { Map<String, String> movieMap = ImportMovies.getMovieMap(); BulkRequest request = new BulkRequest(); try (CSVReaderHeaderAware reader = new CSVReaderHeaderAware(new FileReader(FileUtils.getFilePath(PATH_TO_CSV)))) { Map<String, String> row; while ((row = reader.readMap()) != null) { String movieId = row.get("movieId"); request.add(new IndexRequest(INDEX_NAME) .source(XContentType.JSON, "user_id", row.get("userId"), "movie_id", movieId, "movie_title", movieMap.get(movieId), "tag", row.get("tag"), "timestamp", row.get("timestamp"))); } } return request; } }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
下一篇: 不要相信一个熬夜的人说的每一句话
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论