春季集成文件分解器内存使用
文件对象
上的split()
操作是否缓冲并执行每一行,或者所有行一次加载到内存中?这是为了了解文件恰好包含 100,000 多行时的内存使用情况。transformer
返回void
公平吗?用途是从有效负载和负载中计算一些逻辑。 headers 然后将计算出的值添加到 headers 中。有更好的办法吗?
谢谢
更新:
return IntegrationFlows.from(fileReadingMessageSource(), p -> p.poller(pollerSpec()))
.enrichHeaders(Collections.singletonMap(ERROR_CHANNEL, appErrorChannel))
.split() // process file by file
.log(INFO, message -> "Started File: " + message.getHeaders().get("file_name"))
.enrichHeaders(h -> h.headerFunction("foo", m -> integrationUtil.constructFoo())) // fooobject
.split(fileSplitterSpec()) // split file lines
.filter(payload -> !(payload instanceof FileSplitter.FileMarker), e -> e.discardChannel("aggregatorChannel"))
.log(INFO, message -> "Payload: " + message.getPayload())
.transform(barTransformer)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.log(INFO, message -> "Completed File: " + message.getHeaders().get("file_name"))
.aggregate()
.log(INFO, message -> "All Files Processed")
// .handle(null)
.get();
Does
split()
operation onFile object
buffer and execute each line or do all lines get loaded in memory at once? This is to understand the memory usage if the file happens to contain 100,000+ lines.Is it fair for a
transformer
to returnvoid
? The usage is to compute a few logic from the payload & headers then add the computed value to headers. Is there a better way?
Thanks
UPDATE:
return IntegrationFlows.from(fileReadingMessageSource(), p -> p.poller(pollerSpec()))
.enrichHeaders(Collections.singletonMap(ERROR_CHANNEL, appErrorChannel))
.split() // process file by file
.log(INFO, message -> "Started File: " + message.getHeaders().get("file_name"))
.enrichHeaders(h -> h.headerFunction("foo", m -> integrationUtil.constructFoo())) // fooobject
.split(fileSplitterSpec()) // split file lines
.filter(payload -> !(payload instanceof FileSplitter.FileMarker), e -> e.discardChannel("aggregatorChannel"))
.log(INFO, message -> "Payload: " + message.getPayload())
.transform(barTransformer)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.log(INFO, message -> "Completed File: " + message.getHeaders().get("file_name"))
.aggregate()
.log(INFO, message -> "All Files Processed")
// .handle(null)
.get();
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
是的,这确实是 FileSplitter 的目的。它的内部逻辑基于 FileIterator,它逐行读取并将其发送到分离器输出通道。
不,变压器不能返回
void
。这不是它的目的。听起来更像enrichHeaders()
更适合您。消息是不可变的,您无法修改当前消息以获得可能的进一步逻辑。您使用新数据(或标头)构建一条新消息,并将其作为流下游的回复发出。Yes, that was really a purpose of the
FileSplitter
. It's internal logic is based on theFileIterator
, which reads line by line an emits it to the splitter output channel.No, the transformer cannot return
void
. That's not its purpose. Sounds more like anenrichHeaders()
is better for you. Messages are immutable and you just cannot modify the current message for possible further logic. You build a new message with new data (or header) and emit it as a reply downstream the flow.