对 WordCount 输出进行排序 Flink

发布于 2025-01-19 22:43:31 字数 709 浏览 3 评论 0原文

我正在尝试学习 Flink,并且正在做基本的 WordCount 教程。我想知道如何对数据流的输出进行排序,以便它按降序输出计数。我不需要将其保存为文本文件,只需按降序输出到控制台。

以下是主函数中的内容

DataStream<String> text = env.readTextFile(<PATH TO TEXT>)
DataStream<Tuple2<String, Integer>> counts = 
     text.flatMap(new Tokenizer())
     .keyBy(0)
     .sum(1);

counts.print();

现在,这会毫无问题地写入所有计数,我只想将计数按降序排序(按计数值)。我试图让它与 .addSink() 一起使用,但我不明白如何对此进行排序。

主函数内部

counts.addSink(new CustomSink());

主函数外部

public static final class CustomSink implements SinkFunction<Tuple2<String, Integer>> {
     public void invoke(Integer value) throws Exeception {
     }
}

I am trying to learn Flink and I am doing the basic WordCount tutorial. I was wondering how I could sort the output of a datastream so that it outputs the counts in descending order. I don't need this saved as a text file just output to the console in descending order.

The following is within the main function

DataStream<String> text = env.readTextFile(<PATH TO TEXT>)
DataStream<Tuple2<String, Integer>> counts = 
     text.flatMap(new Tokenizer())
     .keyBy(0)
     .sum(1);

counts.print();

Right now this writes all the counts with no issues I would like to simply have the counts sorted in descending order (by the value of the count). I was trying to get this to work with .addSink() but I do not understand how to sort with this.

Inside the main function

counts.addSink(new CustomSink());

Outside the main function

public static final class CustomSink implements SinkFunction<Tuple2<String, Integer>> {
     public void invoke(Integer value) throws Exeception {
     }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

吻风 2025-01-26 22:43:31
  • 除了时间戳以外的任何其他内容,与无限流的流媒体根本不兼容。

  • 通过Flink的SQL/Table API可以轻松地对有限的流进行排序。没有一个很好的方法来使用数据式API。

  • Sorting by anything other than timestamps is fundamentally incompatible with unbounded streaming.

  • Sorting over bounded streams can easily be done with Flink's SQL/Table API. There isn't a good way to do this with the DataStream API.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文