Flink应用程序上的垃圾收集

发布于 2025-01-22 18:29:30 字数 1297 浏览 3 评论 0原文

我在Scala中有一个非常简单的Flink应用程序。我有2个简单的流。我正在将我的一条小溪广播到另一个流。广播流包含规则,只是检查另一个是流的元组在规则内。一切正常,我的代码如下。

这是一个无限的运行应用程序。我想知道JVM是否有可能收集我的规则对象作为垃圾。

有人有任何想法吗?非常感谢。

object StreamBroadcasting extends App {
  val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
  val stream = env
    .socketTextStream("localhost", 9998)
    .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
    .keyBy(l => l)

  val ruleStream = env
    .socketTextStream("localhost", 9999)
    .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))

  val broadcastStream: DataStream[String] = ruleStream.broadcast

  stream.connect(broadcastStream)
    .flatMap(new SimpleConnect)
    .print

  class SimpleConnect extends RichCoFlatMapFunction[String, String, (String, Boolean)] {
    private var rules: Set[String] = Set.empty[String] // Can JVM collect this object after a long time?

    override def open(parameters: Configuration): Unit = {}

    override def flatMap1(value: String, out: Collector[(String, Boolean)]): Unit = {
      out.collect(value, rules.contains(value))
    }

    override def flatMap2(value: String, out: Collector[(String, Boolean)]): Unit = {
      rules = rules.+(value)
    }
  }

  env.execute("flink-broadcast-streams")
}

I have a very simple Flink application in Scala. I have 2 simple streams. I am broadcasting one of my stream to the other stream. Broadcasted stream is containing rules and just checking whether the other is stream's tuples are inside of rules or not. Everything is working fine and my code is like below.

This is an infinite running application. I wonder if there is any possibility for JVM to collect my rules object as garbage or not.

Does anyone has any idea? Many thanks in advance.

object StreamBroadcasting extends App {
  val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
  val stream = env
    .socketTextStream("localhost", 9998)
    .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
    .keyBy(l => l)

  val ruleStream = env
    .socketTextStream("localhost", 9999)
    .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))

  val broadcastStream: DataStream[String] = ruleStream.broadcast

  stream.connect(broadcastStream)
    .flatMap(new SimpleConnect)
    .print

  class SimpleConnect extends RichCoFlatMapFunction[String, String, (String, Boolean)] {
    private var rules: Set[String] = Set.empty[String] // Can JVM collect this object after a long time?

    override def open(parameters: Configuration): Unit = {}

    override def flatMap1(value: String, out: Collector[(String, Boolean)]): Unit = {
      out.collect(value, rules.contains(value))
    }

    override def flatMap2(value: String, out: Collector[(String, Boolean)]): Unit = {
      rules = rules.+(value)
    }
  }

  env.execute("flink-broadcast-streams")
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

飞烟轻若梦 2025-01-29 18:29:31

不,一组规则不会收集垃圾。它将永远存在。 (当然,由于您不使用Flink的广播状态,因此规则将无法在申请中重新启动。)

No, the Set of rules will not be garbage collected. It will stick around forever. (Of course, since you're not using Flink's broadcast state, the rules won't survive an application restart.)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文