kafka流scala内存泄漏

发布于 2025-02-11 15:33:19 字数 3512 浏览 1 评论 0原文

以下用例:
我想在特定时间内汇总数据,然后在下游。由于内置的​​抑制功能不支持壁时钟时间,因此我必须使用变压器自己实现。
关闭时间窗口后,我将汇总数据下游并将其从状态存储中删除。我用有限的数据测试了行为。 IE处理了所有数据后,状态存储应再次为空,并且内存应减小。不幸的是,内存总是保持在同一级别。

suppressTransFormer.scala

class SuppressTransformer[T](stateStoreName: String, windowDuration: Duration) extends Transformer[String, T, KeyValue[String, T]] {
  val scheduleInterval: Duration = Duration.ofSeconds(180)
  private val keySet = mutable.HashSet.empty[String]
  var context: ProcessorContext = _
  var store: SessionStore[String, Array[T]] = _


  override def init(context: ProcessorContext): Unit = {
    this.context = context;
    this.store = context.getStateStore(stateStoreName).asInstanceOf[SessionStore[String, Array[T]]]

    this.context.schedule(
      scheduleInterval,
      PunctuationType.WALL_CLOCK_TIME,
      _ => {
        for (key <- keySet) {
          val storeEntry = store.fetch(key)
          while (storeEntry.hasNext) {
            val keyValue: KeyValue[Windowed[String], Array[T]] = storeEntry.next()
            val peekKey = keyValue.key
            val now = Instant.now()
            val windowAge: Long = ChronoUnit.SECONDS.between(peekKey.window().startTime(), now)

            if (peekKey.window().start() > 0 && windowAge > windowDuration.toSeconds) {  // Check if window is exceeded. If yes, downstream data
              val windowedKey: Windowed[String] = keyValue.key
              val storeValue = keyValue.value
              context.forward(key, storeValue, To.all().withTimestamp(now.toEpochMilli))
              context.commit()
              this.store.remove(windowedKey) // Delete entry from state store
              keySet -= key
            }
          }
          storeEntry.close()  // Close iterator to avoid memory leak
        }
      }
    )
  }

  override def transform(key: String, value: T): KeyValue[String, T] = {
    if (!keySet.contains(key)) {
      keySet += key
    }
    null
  }

  override def close(): Unit = {}
}

class SuppressTransformerSupplier[T](stateStoreName: String, windowDuration: Duration) extends TransformerSupplier[String, T, KeyValue[String, T]] {
  override def get(): SuppressTransformer[T] = new SuppressTransformer(stateStoreName, windowDuration)
}

topology.scala

val windowDuration = Duration.ofMinutes(5)
val stateStore: Materialized[String, util.ArrayList[Bytes], ByteArraySessionStore] =
  Materialized
    .as[String, util.ArrayList[Bytes]](
      new RocksDbSessionBytesStoreSupplier(stateStoreName,
        stateStoreRetention.toMillis)
    )

builder.stream[String, Bytes](Pattern.compile(topic + "(-\\d+)?"))
  .filter((k, _) => k != null)
  .groupByKey
  .windowedBy(SessionWindows `with` sessionWindowMinDuration `grace` sessionGracePeriodDuration)
  .aggregate(initializer = {
    new util.ArrayList[Bytes]()
  }
  )(aggregator = (_: String, instance: Bytes, agg: util.ArrayList[Bytes]) => {
    agg.add(instance)
    agg
  }, merger = (_: String, state1: util.ArrayList[Bytes], state2: util.ArrayList[Bytes]) => {
    state1.addAll(state2)
    state1
  }
  )(stateStore)
  .toStream
  .map((k, v) => (k.key(), v))
  .transform(new SuppressTransformerSupplier[util.ArrayList[Bytes]](stateStoreName, windowDuration), stateStoreName)
  .unsetRepartitioningRequired()
  .to(f"$topic-aggregated")

”在此处输入图像描述”

Following use case:
I want to aggregate data for a specific time and then downstream them. Since the built-in suppress-feature does not support wall clock time, I have to implement this on my own by using a transformer.
After the time window is closed I downstream the aggregated data and delete them from the state store. I tested the behaviour with a limited amount of data. I.e. after all data have been processed the state store should be empty again and the memory should decrease. Unfortunately the memory always stays at the same level.

SuppressTransformer.scala

class SuppressTransformer[T](stateStoreName: String, windowDuration: Duration) extends Transformer[String, T, KeyValue[String, T]] {
  val scheduleInterval: Duration = Duration.ofSeconds(180)
  private val keySet = mutable.HashSet.empty[String]
  var context: ProcessorContext = _
  var store: SessionStore[String, Array[T]] = _


  override def init(context: ProcessorContext): Unit = {
    this.context = context;
    this.store = context.getStateStore(stateStoreName).asInstanceOf[SessionStore[String, Array[T]]]

    this.context.schedule(
      scheduleInterval,
      PunctuationType.WALL_CLOCK_TIME,
      _ => {
        for (key <- keySet) {
          val storeEntry = store.fetch(key)
          while (storeEntry.hasNext) {
            val keyValue: KeyValue[Windowed[String], Array[T]] = storeEntry.next()
            val peekKey = keyValue.key
            val now = Instant.now()
            val windowAge: Long = ChronoUnit.SECONDS.between(peekKey.window().startTime(), now)

            if (peekKey.window().start() > 0 && windowAge > windowDuration.toSeconds) {  // Check if window is exceeded. If yes, downstream data
              val windowedKey: Windowed[String] = keyValue.key
              val storeValue = keyValue.value
              context.forward(key, storeValue, To.all().withTimestamp(now.toEpochMilli))
              context.commit()
              this.store.remove(windowedKey) // Delete entry from state store
              keySet -= key
            }
          }
          storeEntry.close()  // Close iterator to avoid memory leak
        }
      }
    )
  }

  override def transform(key: String, value: T): KeyValue[String, T] = {
    if (!keySet.contains(key)) {
      keySet += key
    }
    null
  }

  override def close(): Unit = {}
}

class SuppressTransformerSupplier[T](stateStoreName: String, windowDuration: Duration) extends TransformerSupplier[String, T, KeyValue[String, T]] {
  override def get(): SuppressTransformer[T] = new SuppressTransformer(stateStoreName, windowDuration)
}

Topology.scala

val windowDuration = Duration.ofMinutes(5)
val stateStore: Materialized[String, util.ArrayList[Bytes], ByteArraySessionStore] =
  Materialized
    .as[String, util.ArrayList[Bytes]](
      new RocksDbSessionBytesStoreSupplier(stateStoreName,
        stateStoreRetention.toMillis)
    )

builder.stream[String, Bytes](Pattern.compile(topic + "(-\\d+)?"))
  .filter((k, _) => k != null)
  .groupByKey
  .windowedBy(SessionWindows `with` sessionWindowMinDuration `grace` sessionGracePeriodDuration)
  .aggregate(initializer = {
    new util.ArrayList[Bytes]()
  }
  )(aggregator = (_: String, instance: Bytes, agg: util.ArrayList[Bytes]) => {
    agg.add(instance)
    agg
  }, merger = (_: String, state1: util.ArrayList[Bytes], state2: util.ArrayList[Bytes]) => {
    state1.addAll(state2)
    state1
  }
  )(stateStore)
  .toStream
  .map((k, v) => (k.key(), v))
  .transform(new SuppressTransformerSupplier[util.ArrayList[Bytes]](stateStoreName, windowDuration), stateStoreName)
  .unsetRepartitioningRequired()
  .to(f"$topic-aggregated")

enter image description here

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

昵称有卵用 2025-02-18 15:33:19

我认为这不是记忆泄漏。我的意思是可能。但是从您提到的内容来看,它看起来像是普通的Java行为。

发生的事情是JVM采取了所有可能的内存。它是堆内存,最大值由XMX选项配置。您的状态将其全部(我假设,基于图形),然后释放对象。但是JVM通常不会将内存释放回操作系统。这就是您的豆荚始终处于最高状态的原因。

有一些垃圾收藏夹可能会为您做到这一点。

我个人使用更快的GC,让JVM采用尽可能多的内存。归根结底,这就是pod隔离的力量。我通常将HEAP MAX设置为POD MAX内存的%80。

这是一个相关的问题 GC是否会将存储器放回os?

I don't think that is a memory leak. I mean it could be. But from what you mentioned, it looks like normal Java behavior.

What happens is that JVM takes all the memory that it can. It is the heap memory and the maximum is configured by the Xmx option. Your state takes it all (I assume, based on the graph) and then releases the objects. But JVM normally doesn't release the memory back to the OS. That is the reason your pod is always at its highest.

There are a few garbage colletors that could possibly do that for you.

I personally use the GC that is faster and let JVM take as much memory as it requires. At the end of the day, that's the power of pod isolation. I normally set the heap max to %80 of the pod max memory.

Here is a related question Does GC release back memory to OS?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文