在Cassandra 4.X中,我可以用什么用作替代结果?

发布于 2025-02-06 18:43:20 字数 1015 浏览 1 评论 0原文

我必须将Cassandra升级到4.x。 这是先前用Cassandra 3.5编写的代码

 protected <T> Stream<T> getAll(Stream<Statement> statements, Mapper<T> mapper) {
        List<ResultSetFuture> futures = statements
                .peek(p -> cassandraReads.inc())
                .map(s -> session.session().executeAsync(s))
                .collect(Collectors.toList());

        return futures.stream()
                .map(ResultSetFuture::getUninterruptibly)
                .map(mapper::map)
                .flatMap(r -> StreamSupport.stream(r.spliterator(), false));
    }

,我做了一些更改...

 List<CompletionStage<AsyncResultSet>> futures = statements
                .peek(p -> cassandraReads.inc())
                .map(s -> session.getSession().executeAsync(s))
                .collect(Collectors.toList());

但是我应该用什么来代替.map(resultsetFuture :: getunressruption),因为它已被删除。 由于我是卡桑德拉(Cassandra)和异步编程的新手,因此将不胜感激。

I have to upgrade Cassandra to 4.x.
This was the code previously written in cassandra 3.5

 protected <T> Stream<T> getAll(Stream<Statement> statements, Mapper<T> mapper) {
        List<ResultSetFuture> futures = statements
                .peek(p -> cassandraReads.inc())
                .map(s -> session.session().executeAsync(s))
                .collect(Collectors.toList());

        return futures.stream()
                .map(ResultSetFuture::getUninterruptibly)
                .map(mapper::map)
                .flatMap(r -> StreamSupport.stream(r.spliterator(), false));
    }

I have done some changes...

 List<CompletionStage<AsyncResultSet>> futures = statements
                .peek(p -> cassandraReads.inc())
                .map(s -> session.getSession().executeAsync(s))
                .collect(Collectors.toList());

BUT what should I use in place of .map(ResultSetFuture::getUninterruptibly) since it has been removed now.
Since I am new to Cassandra and asynchronous programming any help would be appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

月依秋水 2025-02-13 18:43:20

由于您已经在代码中混合异步并阻止了调用,因此我建议您完全阻止并使用session.execute而不是session.executeasync。这将使事情变得更加容易。然后,您可以按照以下方式重写代码:

protected <T> Stream<T> getAll(Stream<Statement<?>> statements, Mapper<T> mapper) {
  return statements
      .peek(p -> cassandraReads.inc())
      .map(s -> session.execute(s))
      .flatMap(rs -> StreamSupport.stream(rs.spliterator(), false))
      .map(mapper::map);
}

注意:我正在稍微更改您的mapper接口,我假设它将是:(

public interface Mapper<T> {
  T map(Row row);
}

您实际上可以替换此mapper function&lt; row,t&gt; btw。)

除此之外,如埃里克·拉米雷斯(Erick Ramirez)所说,请查看此页面了解如何使用驱动程序4.x编写正确的async代码。

Since you are already mixing async and blocking calls in your code, I'd suggest that you go fully blocking and use Session.execute instead of Session.executeAsync. It's going to make things much easier. Then you could rewrite your code as follows:

protected <T> Stream<T> getAll(Stream<Statement<?>> statements, Mapper<T> mapper) {
  return statements
      .peek(p -> cassandraReads.inc())
      .map(s -> session.execute(s))
      .flatMap(rs -> StreamSupport.stream(rs.spliterator(), false))
      .map(mapper::map);
}

Note: I'm changing slightly your Mapper interface, which I assume would be:

public interface Mapper<T> {
  T map(Row row);
}

(You could actually replace this Mapper interface with just Function<Row, T> btw.)

Other than that, as Erick Ramirez said, please have a look at this page to understand how to write proper async code with driver 4.x.

¢蛋碎的人ぎ生 2025-02-13 18:43:20

异步编程的API实际上不是由您连接到的Cassandra群集的版本确定的。重要的是您正在使用的Java驱动程序的版本。

如果您仍在使用Java驱动程序3.X,则您的代码应在大部分时间继续工作,直到Java驱动程序3.11。参见 asynchronous在Java驱动程序中的编程3.11

直到您升级到Java驱动程序4.x,您才会遇到破坏更改,因为V4.X与Java驱动程序的早期版本不兼容(请参见升级指南有关详细信息)。

如果您升级到Java驱动程序4.x,则可以找到在此处进行异步编程的工作示例。干杯!

The API for asynchronous programming isn't actually determined by the version of the Cassandra cluster you're connecting to. What matters is the version of the Java driver you're using.

If you're still using Java driver 3.x, your code should continue to work for the most part until Java driver 3.11. See Asynchronous programming in Java driver 3.11.

It isn't until you upgrade to the Java driver 4.x that you will run into breaking changes since v4.x is not binary-compatible with earlier versions of the Java driver (see Upgrade guide for details).

If you do upgrade to the Java driver 4.x, you can find working examples for asynchronous programming here. Cheers!

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文