使用多个连接的单个事务。 (MYSQL/JDBC)

发布于 2024-12-18 15:47:53 字数 420 浏览 3 评论 0原文

我正在开发的应用程序是一个基于 Java 的 ETL 流程,它将数据加载到多个表中。 DBMS 是 Infobright(一个基于 MYSQL 的用于数据仓库的 DBMS)。

数据加载应该以原子方式完成;但是,出于性能原因,我想同时将数据加载到多个表中(使用 LOAD DATA INFILE 命令)。这意味着我需要打开多个连接。

有没有任何解决方案可以让我以原子方式并行加载? (我猜答案可能取决于我加载到的表的引擎;其中大多数是 Brighthouse,它允许事务,但没有 XA 也没有保存点)。

为了进一步澄清,我想避免出现这样的情况:

  • 我将数据加载到 5 个表中
  • 我提交了前 4 个表的加载
  • 第 5 个表的提交失败

在这种情况下,我无法回滚前 4 个加载,因为他们已经承诺了。

The application I'm working on is a Java-based ETL process that loads data into multiple tables. The DBMS is Infobright (a MYSQL-based DBMS geared for data warehousing).

The data loading should be done atomically; however, for performance reasons, I want to load data into multiple tables at the same time (using a LOAD DATA INFILE command). This means I need to open multiple connections.

Is there any solution that allows me to do the loads atomically and in parallel?
(I'm guessing the answer might depend on the engine for the tables I load into; most of them are Brighthouse, which allows Transactions, but no XA and no Savepoints).

To further clarify, I want to avoid a situation where let's say:

  • I load data into 5 tables
  • I commit the loads for the first 4 tables
  • The commit for the 5th table fails

In this situation, I can't rollback the first 4 loads, because they are already commited.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

妞丶爷亲个 2024-12-25 15:47:53

正如我所承诺的那样

,我已经编写了一个完整的示例。我使用 MySQL 并创建了三个表,如下所示:

CREATE TABLE `test{1,2,3}` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `data` varchar(255) NOT NULL UNIQUE,
  PRIMARY KEY (`id`)
);

test2 最初包含一行。

INSERT INTO `test2` (`data`) VALUES ('a');

我已将完整代码发布到 http://pastebin.com。)

以下示例做了几件事。

  1. threads 设置为 3,这决定了要并行运行的作业数量。
  2. 创建 threads 个连接。
  3. 为每个表输出一些示例数据(默认情况下,每个表的数据为 a)。
  4. 创建线程个要运行的作业并向它们加载数据。
  5. threads线程 中运行作业并等待其完成(成功与否)。
  6. 如果没有发生异常,则提交每个连接;否则它会回滚它们中的每一个。
  7. 关闭连接(但是这些连接可以重复使用)。

(注意,我在 SQLTask.call() 中使用了 Java 7 的自动资源管理功能。)

逻辑

public static void main(String[] args) throws SQLException, InterruptedException {
  int threads = 3;
  List<Connection> connections = getConnections(threads);
  Map<String, String> tableData = getTableData(threads);
  List<SQLTask> tasks = getTasks(threads, connections);
  setData(tableData, tasks);
  try {
    runTasks(tasks);
    commitConnections(connections);
  } catch (ExecutionException ex) {
    rollbackConnections(connections);
  } finally {
    closeConnections(connections);
  }
}

数据

private static Map<String, String> getTableData(int threads) {
  Map<String, String> tableData = new HashMap<>();
  for (int i = 1; i <= threads; i++)
    tableData.put("test" + i, "a");
  return tableData;
}

任务

private static final class SQLTask implements Callable<Void> {

  private final Connection connection;

  private String data;
  private String table;

  public SQLTask(Connection connection) {
    this.connection = connection;
  }

  public void setTable(String table) {
    this.table = table;
  }

  public void setData(String data) {
    this.data = data;
  }

  @Override
  public Void call() throws SQLException {
    try (Statement statement = connection.createStatement()) {
      statement.executeUpdate(String.format(
        "INSERT INTO `%s` (data) VALUES  ('%s');", table, data));
    }
    return null;
  }
}

private static List<SQLTask> getTasks(int threads, List<Connection> connections) {
  List<SQLTask> tasks = new ArrayList<>();
  for (int i = 0; i < threads; i++)
    tasks.add(new SQLTask(connections.get(i)));
  return tasks;
}

private static void setData(Map<String, String> tableData, List<SQLTask> tasks) {
  Iterator<Entry<String, String>> i = tableData.entrySet().iterator();
  Iterator<SQLTask> j = tasks.iterator();
  while (i.hasNext()) {
    Entry<String, String> entry = i.next();
    SQLTask task = j.next();
    task.setTable(entry.getKey());
    task.setData(entry.getValue());
  }
}

运行

private static void runTasks(List<SQLTask> tasks) 
    throws ExecutionException, InterruptedException {
  ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
  List<Future<Void>> futures = executorService.invokeAll(tasks);
  executorService.shutdown();
  for (Future<Void> future : futures)
    future.get();
}

结果

给定 返回的默认数据getTableData(...)

test1 -> `a`
test2 -> `a`
test3 -> `a`

并且 test2 已经包含 a (并且 data 列是唯一的< /em>) 第二个作业将失败并抛出异常,因此每个连接都将被回滚。

如果您返回 b 而不是 a,那么连接将被安全提交。

这可以通过LOAD DATA 类似地完成。


在OP对我的答案做出回应后,我意识到她/他想做的事情不可能以简单明了的方式完成。

基本上问题是,成功提交后,已提交的数据无法回滚,因为该操作是原子的。鉴于在给定的情况下需要多次提交,除非跟踪所有数据(在所有事务中)并且如果发生某些情况则删除所有已成功提交的内容,否则不可能回滚所有内容。

有一个很好的答案相关提交和回滚的问题。

Intro

As I've promised I've hacked up a complete example. I've used MySQL and created three tables like the following:

CREATE TABLE `test{1,2,3}` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `data` varchar(255) NOT NULL UNIQUE,
  PRIMARY KEY (`id`)
);

test2 contains a single row initially.

INSERT INTO `test2` (`data`) VALUES ('a');

(I've posted the full code to http://pastebin.com.)

The following example does several things.

  1. Sets threads to 3 which determines how many jobs are going to be run in parallel.
  2. Creates threads number of connections.
  3. Spouts out some sample data for every table (by default the data is a for every table).
  4. Creates threads number of jobs to be run and loads them with data.
  5. Runs the jobs in threads number of threads and waits for their completion (successful or not).
  6. If no exceptions occurred commits every connection; otherwise it rolls back each of them.
  7. Closes the connections (however these can be reused).

(Note, that I've used Java 7's automatic resource management feature in SQLTask.call().)

Logic

public static void main(String[] args) throws SQLException, InterruptedException {
  int threads = 3;
  List<Connection> connections = getConnections(threads);
  Map<String, String> tableData = getTableData(threads);
  List<SQLTask> tasks = getTasks(threads, connections);
  setData(tableData, tasks);
  try {
    runTasks(tasks);
    commitConnections(connections);
  } catch (ExecutionException ex) {
    rollbackConnections(connections);
  } finally {
    closeConnections(connections);
  }
}

Data

private static Map<String, String> getTableData(int threads) {
  Map<String, String> tableData = new HashMap<>();
  for (int i = 1; i <= threads; i++)
    tableData.put("test" + i, "a");
  return tableData;
}

Tasks

private static final class SQLTask implements Callable<Void> {

  private final Connection connection;

  private String data;
  private String table;

  public SQLTask(Connection connection) {
    this.connection = connection;
  }

  public void setTable(String table) {
    this.table = table;
  }

  public void setData(String data) {
    this.data = data;
  }

  @Override
  public Void call() throws SQLException {
    try (Statement statement = connection.createStatement()) {
      statement.executeUpdate(String.format(
        "INSERT INTO `%s` (data) VALUES  ('%s');", table, data));
    }
    return null;
  }
}

private static List<SQLTask> getTasks(int threads, List<Connection> connections) {
  List<SQLTask> tasks = new ArrayList<>();
  for (int i = 0; i < threads; i++)
    tasks.add(new SQLTask(connections.get(i)));
  return tasks;
}

private static void setData(Map<String, String> tableData, List<SQLTask> tasks) {
  Iterator<Entry<String, String>> i = tableData.entrySet().iterator();
  Iterator<SQLTask> j = tasks.iterator();
  while (i.hasNext()) {
    Entry<String, String> entry = i.next();
    SQLTask task = j.next();
    task.setTable(entry.getKey());
    task.setData(entry.getValue());
  }
}

Run

private static void runTasks(List<SQLTask> tasks) 
    throws ExecutionException, InterruptedException {
  ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
  List<Future<Void>> futures = executorService.invokeAll(tasks);
  executorService.shutdown();
  for (Future<Void> future : futures)
    future.get();
}

Result

Given the default data returned by getTableData(...)

test1 -> `a`
test2 -> `a`
test3 -> `a`

and the fact that test2 already contains a (and the data column is unique) the second job will fail and throw an exception, thus every connection will be rolled back.

If instead of as you return bs, then the connections will be committed safely.

This can be done similarly with LOAD DATA.


After OP's response on my answer I realized that what she/he wants to do isn't possible to do in a simple and clear manner.

Basically the problem is that after a successful commit the data that was committed can't be rolled-back, because the operation is atomic. Given multiple commits are needed in the case given, rolling-back everything isn't possible unless one tracks all data (in all of the transactions) and if somethings happens deletes everything that was successfully committed.

There is a nice answer relating to the issue of commits and rollbacks.

思念满溢 2024-12-25 15:47:53

实际上,在较新版本的 IEE(而不是 ICE)中,有一个附加功能称为 DLP(分布式负载处理)。网站上有一个 PDF 文件,链接如下:

http://www.infobright.com/Products /功能/

Actually in the newer version of IEE, not ICE, there is an additional feature called DLP (Distributed Load Processing). There is a PDF file on the site, linked from here:

http://www.infobright.com/Products/Features/

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文