Flink JDBC沉入多个模式

发布于 2025-01-24 11:02:57 字数 1414 浏览 4 评论 0原文

我正在使用Flink JDBC接收器将数据推入Postgres表中。数据必须存储在具有相同数据库连接的不同的模式中。

DataStream<Book> stream = env.fromSource(...);

流中的每个书籍记录都有有关必须存储的架构的详细信息。

我试图在准备台词中参数化数据库架构名称,但正如预期的那样,SQL不允许使用。

stream.addSink(JdbcSink.sink(
                    "insert into ?.books (id, title, authors, year) values (?, ?, ?, ?)",
                    (statement, book) -> {
                        statement.setString(1, book.schema)
                        statement.setLong(2, book.id);
                        statement.setString(3, book.title);
                        statement.setString(4, book.authors);
                        statement.setInt(5, book.year);
                    },
                    JdbcExecutionOptions.builder()
                            .withBatchSize(1000)
                            .withBatchIntervalMs(200)
                            .withMaxRetries(5)
                            .build(),
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
                            .withDriverName("org.postgresql.Driver")
                            .withUsername("someUser")
                            .withPassword("somePassword")
                            .build()
            ));

有解决方法吗?还是我必须在单独的水槽中明确添加每个架构?

I am using Flink Jdbc Sink to push data into Postgres tables. The data has to be stored in different schemas having the same database connection.

DataStream<Book> stream = env.fromSource(...);

Each Book record in the stream has details about the schema it has to be stored in.

I tried to parameterize the database schema name in the PreparedStatement, but as expected, that is not allowed by SQL.

stream.addSink(JdbcSink.sink(
                    "insert into ?.books (id, title, authors, year) values (?, ?, ?, ?)",
                    (statement, book) -> {
                        statement.setString(1, book.schema)
                        statement.setLong(2, book.id);
                        statement.setString(3, book.title);
                        statement.setString(4, book.authors);
                        statement.setInt(5, book.year);
                    },
                    JdbcExecutionOptions.builder()
                            .withBatchSize(1000)
                            .withBatchIntervalMs(200)
                            .withMaxRetries(5)
                            .build(),
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
                            .withDriverName("org.postgresql.Driver")
                            .withUsername("someUser")
                            .withPassword("somePassword")
                            .build()
            ));

Is there a workaround for this? Or do I have to explicitly add each schema in a separate sink?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

玩心态 2025-01-31 11:02:57

假设我们有一系列字符串,其中列出了所有数据库模式(可以通过应用程序属性进行预配置)。

我们可以根据架构名称过滤流,并为每个模式创建一个单独的JDBC接收器。

String[] schemas = {"schema-1", "schema-2", "schema-3"};

for(int i=0; i<schemas.length; i++){
    stream.filter(x -> x.schema.equals(schemas[i])).addSink(JdbcSink.sink(
                "insert into " + schemas[i] ".books (id, title, authors, year) values (?, ?, ?, ?)",
                (statement, book) -> {
                    statement.setLong(1, book.id);
                    statement.setString(2, book.title);
                    statement.setString(3, book.authors);
                    statement.setInt(4, book.year);
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
                        .withDriverName("org.postgresql.Driver")
                        .withUsername("someUser")
                        .withPassword("somePassword")
                        .build()
        ));

}

Let's say that we have an array of Strings where all the database schemas are listed (can be pre-configured via application properties).

We can filter the stream based on the schema name and create an individual jdbc sink for each schema.

String[] schemas = {"schema-1", "schema-2", "schema-3"};

for(int i=0; i<schemas.length; i++){
    stream.filter(x -> x.schema.equals(schemas[i])).addSink(JdbcSink.sink(
                "insert into " + schemas[i] ".books (id, title, authors, year) values (?, ?, ?, ?)",
                (statement, book) -> {
                    statement.setLong(1, book.id);
                    statement.setString(2, book.title);
                    statement.setString(3, book.authors);
                    statement.setInt(4, book.year);
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
                        .withDriverName("org.postgresql.Driver")
                        .withUsername("someUser")
                        .withPassword("somePassword")
                        .build()
        ));

}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文