如何将两条消息与从 Apache Camel 中的 MySQL 数据库检索的数据结合起来?

发布于 2025-01-11 15:27:10 字数 2200 浏览 0 评论 0 原文

我目前正在从事一个集成项目。我必须从 MySQL 数据库获取一些数据,然后使用 Apache Camel 将它们组合起来。在数据库中,我有两个表,材料和包。它们是一对多的关系,一个物料可以包含多个包。我已经弄清楚如何从数据库获取数据并将它们保存到 json 文件中,但我不知道如何将这两条消息合并为一条消息。我读过有关聚合的内容,但我并没有真正理解它们。这是我第一次使用 Apache Camel,我真的不知道我现在该做什么。这些路线的代码如下所示:

public class InputAdapter{

public static void main(String[] args) throws Exception {

    BasicDataSource dataSource = new BasicDataSource();
    dataSource.setDriverClassName("com.mysql.jdbc.Driver");
    dataSource.setUrl("jdbc:mysql://localhost:3306/Packages");
    dataSource.setUsername("uname");
    dataSource.setPassword("passwd");
    SimpleRegistry registry = new SimpleRegistry();
    registry.bind("dataSource", dataSource);

    CamelContext context = new DefaultCamelContext(registry);
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("timer://foo?repeatCount=1")
                    .setBody(constant("SELECT * FROM material;"))
                    .to("jdbc:dataSource")
                    .marshal().json(true)
                    .to("file:/some/path/to/file?fileName=materials.json");
            from("timer://foo?repeatCount=1")
                    .setBody(constant("SELECT * FROM package;"))
                    .to("jdbc:dataSource")
                    .marshal().json(true)
                    .to("file:/some/path/to/file?fileName=packages.json");
        }
    });

    context.start();
    Thread.sleep(10000);
    context.stop();
}
}

Material 和 Package 的模型只是具有 getter 和 setter 的私有属性:

public class Material {
private int id;
private int number;
private enumType type;
private String name;
private String description;
private boolean is_deleted;
private List<Package> packageList = new ArrayList<>();

public enum enumType {
    A1, A2, A3, B1, B2, B3, Z1, Z2, Z3;
}

public int getId() {
    return this.id;
}

... some getters
}

public List<Package> getPackageList() {
    return this.packageList;
}

public void setId(int id) {
    this.id = id;
}

... some setters

public void setPackageList(List<Package> packages) {
    this.packageList = packages;
}
}

有人能给我一个提示,我现在应该做什么吗?请帮我。

I'm currently working on an integration project. I have to get some data from the MySQL database and them combine them using Apache Camel. In the database I've got two tables, materials and packages. They are in the one-to-many relation, one material can contain multiple packages. I've already figured out how to get data from the database and save them to json file, but I have no idea how to combine those two messages into one. I've read about Aggregations but I don't really get them. This is my first usage of Apache Camel and I don't really know what sould I do now. The code for those routes looks like this:

public class InputAdapter{

public static void main(String[] args) throws Exception {

    BasicDataSource dataSource = new BasicDataSource();
    dataSource.setDriverClassName("com.mysql.jdbc.Driver");
    dataSource.setUrl("jdbc:mysql://localhost:3306/Packages");
    dataSource.setUsername("uname");
    dataSource.setPassword("passwd");
    SimpleRegistry registry = new SimpleRegistry();
    registry.bind("dataSource", dataSource);

    CamelContext context = new DefaultCamelContext(registry);
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("timer://foo?repeatCount=1")
                    .setBody(constant("SELECT * FROM material;"))
                    .to("jdbc:dataSource")
                    .marshal().json(true)
                    .to("file:/some/path/to/file?fileName=materials.json");
            from("timer://foo?repeatCount=1")
                    .setBody(constant("SELECT * FROM package;"))
                    .to("jdbc:dataSource")
                    .marshal().json(true)
                    .to("file:/some/path/to/file?fileName=packages.json");
        }
    });

    context.start();
    Thread.sleep(10000);
    context.stop();
}
}

And the model for Material and a Package are just private properties with getters and setters:

public class Material {
private int id;
private int number;
private enumType type;
private String name;
private String description;
private boolean is_deleted;
private List<Package> packageList = new ArrayList<>();

public enum enumType {
    A1, A2, A3, B1, B2, B3, Z1, Z2, Z3;
}

public int getId() {
    return this.id;
}

... some getters
}

public List<Package> getPackageList() {
    return this.packageList;
}

public void setId(int id) {
    this.id = id;
}

... some setters

public void setPackageList(List<Package> packages) {
    this.packageList = packages;
}
}

Can someone give me a hint what sould I do now? Please help me.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

稚然 2025-01-18 15:27:10

聚合器通常用于组合来自源的消息。我可能不会使用聚合器来组合这两组项目。如果您希望提取所有材质并从数据库中获取关联的包,则最好检索每种材质的包列表。

我将创建一个处理器来处理检索每个返回的材质对象的包,然后在单个路由中输出整个内容。

您可以在 Camel 中定义一个处理器类,如下所示:

public class PackageProcessor implements Processor {
  @Override
  public void process(Exchange exchange) {
    // Transform the input to your message class
    // Retrieve the Packages
    // Transform the results to Packages
    // Add to the Material
    // Set the Out Body
    exchange.getMessage().setBody(material);
  }
}

然后您可以在您的 Route 中使用该处理器来完成这项工作。这将使路由看起来像这样:

from("timer://foo?repeatCount=1")
  .routeId("my-material-route")
  .setBody(constant("SELECT * FROM materials;"))
  .to("jdbc:dataSource")
  .split(body())
  .process(new PackageProcessor())
  .setHeader(Exchange.FILE_NAME, simple("${exchangeId}.json"))
  .marshal().json(true)
  .to("file:/somepath")
  .end();

这会将每条记录输出到包含所需信息的 Json 文件。如果您希望将所有项目放入一个文件中,那么聚合器就可以发挥作用。

您会注意到路线中的一些项目超出了原始路线。 JDBC组件的结果是ArrayList>。我们在路由中添加一个 Split,将每个项目单独发送到处理器而不是整个结果集。处理器应该接收 HashMap 。在交换机构中。

在处理器之后,我们将交换上的 CamelFileName 标头设置为交换 Id,然后这将从材料表中的每个记录输出一个单独的文件。

如果您想将所有内容都放在一个文件中,您可以使用聚合器来收集交换并构建列表。将交换释放到 JSON 文件可能会稍微复杂一些。您通常必须设置超时或某种评估函数来确定何时应该释放“超级”交换。

Aggregators are normally used to combine messages coming in from a source. I probably wouldn't use the aggregators to combine these two sets of items. If you're looking to pull all the Materials and get the associated packages from the database, you might be better to retrieve the list of packages per Material.

I would create a Processor that handles retrieving the packages for each of the returned Materials objects and then output the whole thing in a single route.

You can define a processor class in Camel like so:

public class PackageProcessor implements Processor {
  @Override
  public void process(Exchange exchange) {
    // Transform the input to your message class
    // Retrieve the Packages
    // Transform the results to Packages
    // Add to the Material
    // Set the Out Body
    exchange.getMessage().setBody(material);
  }
}

You can then use the processor in your Route to do this work. That would make the route look something like this then:

from("timer://foo?repeatCount=1")
  .routeId("my-material-route")
  .setBody(constant("SELECT * FROM materials;"))
  .to("jdbc:dataSource")
  .split(body())
  .process(new PackageProcessor())
  .setHeader(Exchange.FILE_NAME, simple("${exchangeId}.json"))
  .marshal().json(true)
  .to("file:/somepath")
  .end();

This would output each record to a Json file with the needed information. If you want all of the items to be placed in a single file, this is where an aggregator would come into play.

You'll notice a couple of items in the route outside of your original. The result of the JDBC component is an ArrayList<HashMap<String, Object>>. We add a Split in the route to send each item separately into the processor instead of the entire result set. The processor should receive a HashMap<String, Object> in the Exchange Body.

After the processor, we set the CamelFileName Header on the exchange to the Exchange Id and this will then output an individual file per record from the Materials table.

If you want to have it all in a single file, you can use an Aggregator that will collect the exchanges and build a list. That might be a little more complicated to have it release the exchanges out to a JSON file. You normally have to set a timeout or some sort of evaluation function to figure out when the "super" exchange should be released.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文