Elasticsearch 批量导入 (Java 代码)

发布于 2024-11-21 04:28:53 字数 11263 浏览 2 评论 0

测试数据 项目地址:JasonWu73/elasticsearch

Elasticsearch 索引

PUT /movies
{
  "mappings": {
    "properties": {
      "id": {
        "type": "integer"
      },
      "title": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "year": {
        "type": "integer"
      },
      "genre": {
        "type": "text"
      }
    }
  }
}

PUT /ratings
{
  "mappings": {
    "properties": {
      "user_id": {
        "type": "integer"
      },
      "movie_id": {
        "type": "integer"
      },
      "movie_title": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "rating": {
        "type": "float"
      },
      "timestampe": {
        "type": "date"
      }
    }
  }
}

PUT /tags
{
  "mappings": {
    "properties": {
      "user_id": {
        "type": "integer"
      },
      "movie_id": {
        "type": "integer"
      },
      "movie_title": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "tag": {
        "type": "text"
      },
      "timestamp": {
        "type": "date"
      }
    }
  }
}

项目目录结构

.
├── elasticsearch.iml
├── pom.xml
└── src
    ├── main
    │   ├── java
    │   │   └── net
    │   │       └── wuxianjie
    │   │           └── elasticsearch
    │   │               ├── ImportMovies.java
    │   │               ├── ImportRatings.java
    │   │               ├── ImportTags.java
    │   │               ├── config
    │   │               │   └── Initialization.java
    │   │               └── util
    │   │                   ├── EsUtils.java
    │   │                   └── FileUtils.java
    │   └── resources
    │       └── ml-latest-small
    │           ├── README.txt
    │           ├── links.csv
    │           ├── movies.csv
    │           ├── ratings.csv
    │           └── tags.csv
    └── test
        └── java

Maven 依赖

 <dependencies>
   <dependency>
     <groupId>com.opencsv</groupId>
     <artifactId>opencsv</artifactId>
     <version>5.4</version>
   </dependency>

   <dependency>
     <groupId>org.elasticsearch.client</groupId>
     <artifactId>elasticsearch-rest-high-level-client</artifactId>
     <version>${elasticsearch.version}</version>
   </dependency>
 </dependencies>

Java 代码

Java High Level REST Client 初始化类

package net.wuxianjie.elasticsearch.config;

import java.io.IOException;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;

public class Initialization {

  private final RestHighLevelClient client;

  private Initialization() {
    client = new RestHighLevelClient(
      RestClient.builder(
        new HttpHost("localhost", 9200, "http")));
  }

  private static class SingletonHelper {

    private static final Initialization INSTANCE = new Initialization();
  }

  public static Initialization getInstance() {
    return SingletonHelper.INSTANCE;
  }

  public RestHighLevelClient getClient() {
    return this.client;
  }

  public void close() throws IOException {
    this.client.close();
  }
}

工具类

package net.wuxianjie.elasticsearch.util;

import java.io.File;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import net.wuxianjie.elasticsearch.ImportMovies;

public class FileUtils {

  public static String getFilePath(String path) throws URISyntaxException {

    if (path == null || path.isBlank()) {
      throw new IllegalArgumentException("文件路径不能为空");
    }

    URL res = ImportMovies.class.getClassLoader().getResource(path);
    if (res == null) {
      throw new IllegalArgumentException("文件路径错误");
    }

    File file = Paths.get(res.toURI()).toFile();
    return file.getAbsolutePath();
  }
}
package net.wuxianjie.elasticsearch.util;

import java.io.IOException;
import net.wuxianjie.elasticsearch.config.Initialization;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;

public class EsUtils {

  public static void bulkIndex(BulkRequest request) throws IOException {

    try (RestHighLevelClient client = Initialization.getInstance().getClient()) {

      BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

      if (bulkResponse.hasFailures()) {
        for (BulkItemResponse bulkItemResponse : bulkResponse) {
          if (bulkItemResponse.isFailed()) {
            BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
            throw new RuntimeException(failure.getMessage(), failure.getCause());
          }
        }
      }
    }
  }
}

导入类

package net.wuxianjie.elasticsearch;

import com.opencsv.CSVReaderHeaderAware;
import com.opencsv.exceptions.CsvValidationException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import net.wuxianjie.elasticsearch.util.FileUtils;
import net.wuxianjie.elasticsearch.util.EsUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;

public class ImportMovies {

  public static final String PATH_TO_CSV = "ml-latest-small/movies.csv";
  public static final String INDEX_NAME = "movies";

  public static Map<String, String> getMovieMap()
    throws URISyntaxException, IOException, CsvValidationException {

    Map<String, String> result = new HashMap<>();

    try (CSVReaderHeaderAware reader = new CSVReaderHeaderAware(new FileReader(FileUtils.getFilePath(PATH_TO_CSV)))) {

      Map<String, String> row;
      while ((row = reader.readMap()) != null) {

        String titleAndYear = row.get("title");
        Map<String, String> titleMap = parseTitle(titleAndYear);

        result.put(row.get("movieId"), titleMap.get("title"));
      }
    }

    return result;
  }

  public static void main(String[] args) throws IOException, URISyntaxException, CsvValidationException {
    EsUtils.bulkIndex(readCsv());
  }

  private static BulkRequest readCsv() throws IOException, CsvValidationException, URISyntaxException {

    BulkRequest request = new BulkRequest();

    CSVReaderHeaderAware reader = new CSVReaderHeaderAware(new FileReader(FileUtils.getFilePath(PATH_TO_CSV)));

    Map<String, String> row;
    while ((row = reader.readMap()) != null) {

      String movieId = row.get("movieId");
      String titleAndYear = row.get("title");

      Map<String, String> titleMap = parseTitle(titleAndYear);

      request.add(new IndexRequest(INDEX_NAME)
        .id(movieId)
        .source(XContentType.JSON,
          "id", movieId,
          "title", titleMap.get("title"),
          "year", titleMap.get("year"),
          "genre", row.get("genres")));
    }

    return request;
  }

  private static Map<String, String> parseTitle(String str) {

    int beginIndexOfYear = str.lastIndexOf(" (");

    if (beginIndexOfYear == -1) {
      return new HashMap<>() {{
        put("title", str);
        put("year", null);
      }};
    }

    return new HashMap<>() {{
      put("title", str.substring(0, beginIndexOfYear));
      put("year", str.substring(beginIndexOfYear + 2, str.length() - 1));
    }};
  }
}
package net.wuxianjie.elasticsearch;

import com.opencsv.CSVReaderHeaderAware;
import com.opencsv.exceptions.CsvValidationException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
import net.wuxianjie.elasticsearch.util.FileUtils;
import net.wuxianjie.elasticsearch.util.EsUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;

public class ImportRatings {

  public static final String PATH_TO_CSV = "ml-latest-small/ratings.csv";
  public static final String INDEX_NAME = "ratings";

  public static void main(String[] args) throws CsvValidationException, IOException, URISyntaxException {
    EsUtils.bulkIndex(readCsv());
  }

  private static BulkRequest readCsv() throws IOException, CsvValidationException, URISyntaxException {

    Map<String, String> movieMap = ImportMovies.getMovieMap();

    BulkRequest request = new BulkRequest();

    try (CSVReaderHeaderAware reader = new CSVReaderHeaderAware(new FileReader(FileUtils.getFilePath(PATH_TO_CSV)))) {

      Map<String, String> row;
      while ((row = reader.readMap()) != null) {

        String movieId = row.get("movieId");

        request.add(new IndexRequest(INDEX_NAME)
          .source(XContentType.JSON,
            "user_id", row.get("userId"),
            "movie_id", movieId,
            "movie_title", movieMap.get(movieId),
            "rating", row.get("rating"),
            "timestamp", row.get("timestamp")));
      }
    }

    return request;
  }
}
package net.wuxianjie.elasticsearch;

import com.opencsv.CSVReaderHeaderAware;
import com.opencsv.exceptions.CsvValidationException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
import net.wuxianjie.elasticsearch.util.FileUtils;
import net.wuxianjie.elasticsearch.util.EsUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;

public class ImportTags {

  public static final String PATH_TO_CSV = "ml-latest-small/tags.csv";
  public static final String INDEX_NAME = "tags";

  public static void main(String[] args) throws IOException, URISyntaxException, CsvValidationException {
    EsUtils.bulkIndex(readCsv());
  }

  public static BulkRequest readCsv() throws IOException, CsvValidationException, URISyntaxException {

    Map<String, String> movieMap = ImportMovies.getMovieMap();

    BulkRequest request = new BulkRequest();

    try (CSVReaderHeaderAware reader = new CSVReaderHeaderAware(new FileReader(FileUtils.getFilePath(PATH_TO_CSV)))) {

      Map<String, String> row;
      while ((row = reader.readMap()) != null) {

        String movieId = row.get("movieId");

        request.add(new IndexRequest(INDEX_NAME)
          .source(XContentType.JSON,
            "user_id", row.get("userId"),
            "movie_id", movieId,
            "movie_title", movieMap.get(movieId),
            "tag", row.get("tag"),
            "timestamp", row.get("timestamp")));
      }
    }

    return request;
  }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

醉城メ夜风

暂无简介

0 文章
0 评论
22 人气
更多

推荐作者

有深☉意

文章 0 评论 0

硪扪都還晓

文章 0 评论 0

DS

文章 0 评论 0

我也只是我

文章 0 评论 0

TangBin

文章 0 评论 0

橪书

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文