ksqlDB java 客户端:无法从 client.executeStatement() 反序列化数据

发布于 2025-01-12 13:37:11 字数 2455 浏览 0 评论 0原文

我正在尝试使用 Java ksqlDb-api-client:0.24.0 使用下面的代码查询汇合云上的 KSQL 流

        ClientOptions options = ClientOptions.create()
                .setHost(KSQLDB_SERVER_HOST)
                .setPort(KSQLDB_SERVER_HOST_PORT)
                .setUseTls(true)
                .setUseAlpn(true)
                .setBasicAuthCredentials(USER_NAME,PASSWORD);

        Client client = Client.create(options);
        Map<String, Object> properties = Collections.singletonMap("auto.offset.reset", "earliest");

        client.streamQuery("select * from process_payments EMIT CHANGES;", properties)
                .thenAccept(streamedQueryResult -> {
                    System.out.println("Query has started. Query ID: " + streamedQueryResult.queryID());
//                    RowSubscriber subscriber = new RowSubscriber();
//                    streamedQueryResult.subscribe(subscriber);

                }).exceptionally(e -> {
                    e.printStackTrace();
                    System.out.println("Request failed: " + e);
                    return null;
                });

即使 RowSubscriber 被注释掉,我仍然收到以下异常,仅供参考,我的消息就像单个字符串列,以便于调试:

Caused by: java.lang.NoClassDefFoundError: io/confluent/ksql/schema/utils/FormatOptions
    at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
    at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166)
    at java.base/java.lang.Class.getDeclaredMethods(Class.java:2309)
    at com.fasterxml.jackson.databind.util.ClassUtil.getClassMethods(ClassUtil.java:1231)
    ... 88 more
Caused by: java.lang.ClassNotFoundException: io.confluent.ksql.schema.utils.FormatOptions
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    ... 92 more

Request failed: java.util.concurrent.CompletionException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Failed on call to `getDeclaredMethods()` on class `io.confluent.ksql.schema.ksql.LogicalSchema`, problem: (java.lang.NoClassDefFoundError) io/confluent/ksql/schema/utils/FormatOptions
 at [Source: (byte[])"{"queryId":"transient_PROCESS_PAYMENTS_2977877146871246906","columnNames":["STATUS"],"columnTypes":["STRING"]}"; line: 1, column: 1]

我错过了什么吗?

I am trying to query a KSQL Stream on confluent cloud using Java ksqlDb-api-client:0.24.0 using the below code

        ClientOptions options = ClientOptions.create()
                .setHost(KSQLDB_SERVER_HOST)
                .setPort(KSQLDB_SERVER_HOST_PORT)
                .setUseTls(true)
                .setUseAlpn(true)
                .setBasicAuthCredentials(USER_NAME,PASSWORD);

        Client client = Client.create(options);
        Map<String, Object> properties = Collections.singletonMap("auto.offset.reset", "earliest");

        client.streamQuery("select * from process_payments EMIT CHANGES;", properties)
                .thenAccept(streamedQueryResult -> {
                    System.out.println("Query has started. Query ID: " + streamedQueryResult.queryID());
//                    RowSubscriber subscriber = new RowSubscriber();
//                    streamedQueryResult.subscribe(subscriber);

                }).exceptionally(e -> {
                    e.printStackTrace();
                    System.out.println("Request failed: " + e);
                    return null;
                });

Even with the RowSubscriber is commented out, I am still getting the below exception, FYI my msg is as simple as a single String column for ease of debugging:

Caused by: java.lang.NoClassDefFoundError: io/confluent/ksql/schema/utils/FormatOptions
    at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
    at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166)
    at java.base/java.lang.Class.getDeclaredMethods(Class.java:2309)
    at com.fasterxml.jackson.databind.util.ClassUtil.getClassMethods(ClassUtil.java:1231)
    ... 88 more
Caused by: java.lang.ClassNotFoundException: io.confluent.ksql.schema.utils.FormatOptions
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    ... 92 more

Request failed: java.util.concurrent.CompletionException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Failed on call to `getDeclaredMethods()` on class `io.confluent.ksql.schema.ksql.LogicalSchema`, problem: (java.lang.NoClassDefFoundError) io/confluent/ksql/schema/utils/FormatOptions
 at [Source: (byte[])"{"queryId":"transient_PROCESS_PAYMENTS_2977877146871246906","columnNames":["STATUS"],"columnTypes":["STRING"]}"; line: 1, column: 1]

Am I missing something?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

野生奥特曼 2025-01-19 13:37:11

看起来您需要将 ksqldb-udf 添加到您的类路径中。该 jar 包含 FormatOptions

It looks like you'll need to add ksqldb-udf to your classpath. That jar contains FormatOptions.

梦里°也失望 2025-01-19 13:37:11

我认为这是由于版本冲突造成的。我在 7.2.1 版本中也遇到了完全相同的错误。它在 6.0.1 版本中运行良好,无需额外依赖。

I think this is due to version conflict. I'm also getting the exact same error with version 7.2.1. It's working fine in version6.0.1 without additional dependency.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文