春季集成文件分解器内存使用

发布于 2025-01-19 22:10:50 字数 1552 浏览 2 评论 0原文

  1. 文件对象上的split()操作是否缓冲并执行每一行,或者所有行一次加载到内存中?这是为了了解文件恰好包含 100,000 多行时的内存使用情况。

  2. transformer 返回 void 公平吗?用途是从有效负载和负载中计算一些逻辑。 headers 然后将计算出的值添加到 headers 中。有更好的办法吗?

谢谢

更新:

return IntegrationFlows.from(fileReadingMessageSource(), p -> p.poller(pollerSpec()))
                        .enrichHeaders(Collections.singletonMap(ERROR_CHANNEL, appErrorChannel))
                        .split() // process file by file
                        .log(INFO, message -> "Started File: " + message.getHeaders().get("file_name"))
                        .enrichHeaders(h -> h.headerFunction("foo", m -> integrationUtil.constructFoo())) // fooobject
                        .split(fileSplitterSpec()) // split file lines
                            .filter(payload -> !(payload instanceof FileSplitter.FileMarker), e -> e.discardChannel("aggregatorChannel"))
                            .log(INFO, message -> "Payload: " + message.getPayload())
                            .transform(barTransformer)
                        .channel("aggregatorChannel")
                        .aggregate(new FileAggregator())
                        .log(INFO, message -> "Completed File: " + message.getHeaders().get("file_name"))
                        .aggregate()
                        .log(INFO, message -> "All Files Processed")
                        // .handle(null)
                        .get();
  1. Does split() operation on File object buffer and execute each line or do all lines get loaded in memory at once? This is to understand the memory usage if the file happens to contain 100,000+ lines.

  2. Is it fair for a transformer to return void? The usage is to compute a few logic from the payload & headers then add the computed value to headers. Is there a better way?

Thanks

UPDATE:

return IntegrationFlows.from(fileReadingMessageSource(), p -> p.poller(pollerSpec()))
                        .enrichHeaders(Collections.singletonMap(ERROR_CHANNEL, appErrorChannel))
                        .split() // process file by file
                        .log(INFO, message -> "Started File: " + message.getHeaders().get("file_name"))
                        .enrichHeaders(h -> h.headerFunction("foo", m -> integrationUtil.constructFoo())) // fooobject
                        .split(fileSplitterSpec()) // split file lines
                            .filter(payload -> !(payload instanceof FileSplitter.FileMarker), e -> e.discardChannel("aggregatorChannel"))
                            .log(INFO, message -> "Payload: " + message.getPayload())
                            .transform(barTransformer)
                        .channel("aggregatorChannel")
                        .aggregate(new FileAggregator())
                        .log(INFO, message -> "Completed File: " + message.getHeaders().get("file_name"))
                        .aggregate()
                        .log(INFO, message -> "All Files Processed")
                        // .handle(null)
                        .get();

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

深海里的那抹蓝 2025-01-26 22:10:50

是的,这确实是 FileSplitter 的目的。它的内部逻辑基于 FileIterator,它逐行读取并将其发送到分离器输出通道。

不,变压器不能返回void。这不是它的目的。听起来更像 enrichHeaders() 更适合您。消息是不可变的,您无法修改当前消息以获得可能的进一步逻辑。您使用新数据(或标头)构建一条新消息,并将其作为流下游的回复发出。

Yes, that was really a purpose of the FileSplitter. It's internal logic is based on the FileIterator, which reads line by line an emits it to the splitter output channel.

No, the transformer cannot return void. That's not its purpose. Sounds more like an enrichHeaders() is better for you. Messages are immutable and you just cannot modify the current message for possible further logic. You build a new message with new data (or header) and emit it as a reply downstream the flow.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文