使用不可重复的'读取和解析大型CSV文件溪流
感谢Harshank Bansal,Ryan Hoegg和Aled对较早问题的建议和评论在这里
使用mule 4.4社区版和前提。
因此,基于注释,AM读取大型CSV文件,为非重复流
文件是一个CSV文件,但管道分开:
101|John Saunders|19|M|Physics|Chemistry|Mechanics
102|Jim White|17|M|Languages|Art|Pottery
...
...
一旦阅读,请使用attributes.size == 0,将其读取到AM AM检查空文件。
现在,此时使用设置有效载荷
使用延期= true
。
在此组件之后,在调试模式下,有效载荷显示为java.io.pipedinputstream
(这很好,这只是一个观察值)
现在将此有效载荷传递给http request
调用休息端点的组件。目前已经嘲笑了这个休息端,并记录了此休息端点收到的有效载荷。
但是,此休息端点收到的有效负载是一个空数组,
我需要在HTTP请求上设置任何属性吗?我可以看到诸如请求流模式
之类的属性,我们需要在那里配置任何东西吗?
另一个疑问是:预计将被调用多少次剩余服务?它会被多次调用还是仅一次请求?
这是我尝试过的完整代码。
<flow name="get:employee" >
<set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
<file:read doc:name="Read emp file"
config-ref="File_Config"
path="/emp_large.unl"
outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
<non-repeatable-stream />
</file:read>
<try doc:name="Try" >
<choice doc:name="is the file empty ?" >
<when expression="attributes.size == 0">
<logger level="ERROR" doc:name="Payload is empty " message="Co-relationId : #[correlationId] Empty payload from file: #[vars.fileName] !" category="load.empData" />
<raise-error doc:name="Raise error on empty file " type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File either empty or not received or error in reading file content" />
</when>
<otherwise>
</otherwise>
</choice>
<set-payload value='#[output application/json deferred=true --- payload map (value,index)->{ "id": value.column_0, "name": value.column_1 }]' doc:name="Set Payload" />
<http:request method="POST" doc:name="Submit products to Rest API" config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="NEVER">
<non-repeatable-stream />
<http:headers ><![CDATA[#[output application/java
---
{
"Host" : p('http_emp.host'),
"Content-Type" : "application/json"
}]]]></http:headers>
<http:response-validator >
<http:success-status-code-validator values="200" />
</http:response-validator>
</http:request>
<logger level="INFO" doc:name="Logger" category="load.empData" message="Co-relationId : #[correlationId] Successfully published emp data to XYZ , response received is : #[payload]"/>
<error-handler >
<logger level="ERROR" doc:name="Failure log" message="Co-relationId : #[correlationId] error encountered after reading file #[vars.fileName] , caused by : #[error.detailedDescription]" category="load.empData"/>
</on-error-continue>
</error-handler>
</try>
<logger level="INFO" doc:name="Logger" message="Co-relationId : #[correlationId] processing emp data END:" category="load.empData"/>
</flow>
edit1 :粘贴完整的代码下面:
<flow name="get:employee" doc:id="feffbaae-2873-4248-a043-d51697083b75">
<logger level="INFO" doc:name="Logger" doc:id="10c9e0bb-7f18-42b8-9378-1225cb546641" message="Co-relationId : #[correlationId] processing emp data START:" category="send.empData"/>
<set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
<logger level="INFO" doc:name="Logger" doc:id="0f182868-404c-491e-acab-88832b73d73e" category="send.empData" message="Co-relationId : #[correlationId] After successfully moving file "/>
<until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="cec72a48-72b7-436d-a024-fb3986fb3432" millisBetweenRetries="${sftp.retry.frequency}">
<file:read doc:name="Read emp file" doc:id="e77633d5-5f4f-43a9-862b-9d6076308c2a"
config-ref="File_Config"
path="C:\emp_large.unl"
outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
<non-repeatable-stream />
</file:read>
</until-successful>
<logger level="INFO" doc:name="Logger" doc:id="438f9b64-8d66-4999-ac72-cdd7ade3cd0f" category="send.empData" message="Co-relationId : #[correlationId] After successfully Reading file "/>
<try doc:name="Try" doc:id="c934788a-28db-4e49-a7cd-ee8eaff026ae" >
<choice doc:name="is the file empty ?" doc:id="69c57e57-354c-4a01-9810-35fc4228d5d9">
<when expression="attributes.size == 0">
<logger level="ERROR" doc:name="Payload is empty " doc:id="08decd27-c8dc-41e7-8c1d-f395f405b248" message="Co-relationId : #[correlationId] Empty payload from file!" category="send.empData" />
<raise-error doc:name="Raise error on empty file " doc:id="ad6e2e62-09f2-4e4a-bc2c-99d7ce2c95c8" type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File either empty or not received or error in reading file content" />
</when>
<otherwise>
<logger level="INFO" doc:name="Payload not empty" doc:id="606550aa-c59e-4ff4-ad15-026a8616845a" message="Co-relationId : #[correlationId] File not empty and contains #[sizeOf(payload)] rows" category="send.empData" />
</otherwise>
</choice>
<set-payload value='#[output application/json deferred=true --- {"clientId": "abcd", "employees": payload map (value,index)->{ "id": value.column_0, "name": value.column_1 } }]' doc:name="Set Payload" doc:id="5dba41e5-df65-42e3-8899-0c4abf8f8c16" />
<http:request method="POST" doc:name="Submit products to XYZ" doc:id="575d1c2f-d20f-4174-b7e6-ad0074ea7eb9" config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="AUTO" sendBodyMode="AUTO">
<non-repeatable-stream />
<http:headers ><![CDATA[#[output application/java
---
{
"Host" : p('http_emp.host'),
"Content-Type" : "application/json"
}]]]></http:headers>
<http:response-validator >
<http:success-status-code-validator values="200" />
</http:response-validator>
</http:request>
<logger level="INFO" doc:name="Logger" doc:id="d3988977-bdda-4eac-bf88-508201582a78" category="send.empData" message="Co-relationId : #[correlationId] Successfully published product data to XYZ , response received is : #[payload]"/>
<error-handler >
<on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue - Move emp file from processing to error folder" doc:id="a02560ca-77fb-443f-88b0-fcc27185ea7c" >
<logger level="ERROR" doc:name="Failure log" doc:id="c4a10961-d853-4fc4-87d7-9b5c55750a7c" message="Co-relationId : #[correlationId] error encountered after reading file , caused by : #[error.detailedDescription]" category="send.empData"/>
<until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="d8b3e71a-02fc-4c19-be7f-3851288b85a0" millisBetweenRetries="${sftp.retry.frequency}">
<sftp:move doc:name="Move file to error" doc:id="22b353e3-ec17-4df5-aa20-4055e0a6cf3b" config-ref="SFTP_Config" sourcePath="#[p('sftp.outputProcessingDir') ++ '/' ++ vars.fileName]" targetPath="${sftp.outputErrorDir}" createParentDirectories="false" overwrite="true"/>
</until-successful>
<logger level="INFO" doc:name="Logger" doc:id="dcc9dd6b-0734-486d-b0c1-a9fc3fe64348" category="send.empData" message="Co-relationId : #[correlationId] successfully moved file to error folder"/>
</on-error-continue>
</error-handler>
</try>
<logger level="INFO" doc:name="Logger" doc:id="20453f8a-45ad-4e23-98d5-a03a66509b5d" message="Co-relationId : #[correlationId] processing emp data END:" category="send.empData"/>
<error-handler >
<on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue" doc:id="3fa169c0-92ad-431b-9440-a21448db4bce" type="RETRY_EXHAUSTED">
<logger level="ERROR" doc:name="Logger" doc:id="bf68b097-9b02-4770-b0f3-9972bc91a97f" message="Co-relationId : #[correlationId] while processing emp file Error is : #[error.suppressedErrors[0].errorType] caused by #[error.suppressedErrors[0].detailedDescription]" category="send.empData"/>
</on-error-continue>
</error-handler>
</flow>
Thanks to harshank bansal , Ryan Hoegg and aled for their suggestions and comments to an earlier question here
Using Mule 4.4 community edition and on premise.
So based on comments am reading the large CSV file as a non repeatable stream
File is a CSV file but pipe separated:
101|John Saunders|19|M|Physics|Chemistry|Mechanics
102|Jim White|17|M|Languages|Art|Pottery
...
...
Once it is read am checking for empty file using attributes.size == 0
Now at this point using Set Payload
with deferred=true
.
In debug mode after this component the payload shows up as java.io.PipedInputStream
( which is fine , this is simply an observation )
Now am passing this payload to the HTTP Request
component which calls a REST endpoint . Presently have mocked this REST endpoint and am logging the payload received by this REST endpoint.
However the payload received by this REST endpoint is an empty array
Do I need to set any attribute on HTTP Request ? I can see properties such as Request Streaming mode
, do we need to configure anything there ?
Another doubt is : how many times is the REST service expected to be called ? will it get called multiple times or only a single request ?
here is the complete code of what I have tried .
<flow name="get:employee" >
<set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
<file:read doc:name="Read emp file"
config-ref="File_Config"
path="/emp_large.unl"
outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
<non-repeatable-stream />
</file:read>
<try doc:name="Try" >
<choice doc:name="is the file empty ?" >
<when expression="attributes.size == 0">
<logger level="ERROR" doc:name="Payload is empty " message="Co-relationId : #[correlationId] Empty payload from file: #[vars.fileName] !" category="load.empData" />
<raise-error doc:name="Raise error on empty file " type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File either empty or not received or error in reading file content" />
</when>
<otherwise>
</otherwise>
</choice>
<set-payload value='#[output application/json deferred=true
---
payload map (value,index)->{
"id": value.column_0,
"name": value.column_1
}]' doc:name="Set Payload" />
<http:request method="POST" doc:name="Submit products to Rest API" config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="NEVER">
<non-repeatable-stream />
<http:headers ><![CDATA[#[output application/java
---
{
"Host" : p('http_emp.host'),
"Content-Type" : "application/json"
}]]]></http:headers>
<http:response-validator >
<http:success-status-code-validator values="200" />
</http:response-validator>
</http:request>
<logger level="INFO" doc:name="Logger" category="load.empData" message="Co-relationId : #[correlationId] Successfully published emp data to XYZ , response received is : #[payload]"/>
<error-handler >
<logger level="ERROR" doc:name="Failure log" message="Co-relationId : #[correlationId] error encountered after reading file #[vars.fileName] , caused by : #[error.detailedDescription]" category="load.empData"/>
</on-error-continue>
</error-handler>
</try>
<logger level="INFO" doc:name="Logger" message="Co-relationId : #[correlationId] processing emp data END:" category="load.empData"/>
</flow>
Attaching screen print of the flow for easier visualisation:
Edit1: Pasting complete code below :
<flow name="get:employee" doc:id="feffbaae-2873-4248-a043-d51697083b75">
<logger level="INFO" doc:name="Logger" doc:id="10c9e0bb-7f18-42b8-9378-1225cb546641" message="Co-relationId : #[correlationId] processing emp data START:" category="send.empData"/>
<set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
<logger level="INFO" doc:name="Logger" doc:id="0f182868-404c-491e-acab-88832b73d73e" category="send.empData" message="Co-relationId : #[correlationId] After successfully moving file "/>
<until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="cec72a48-72b7-436d-a024-fb3986fb3432" millisBetweenRetries="${sftp.retry.frequency}">
<file:read doc:name="Read emp file" doc:id="e77633d5-5f4f-43a9-862b-9d6076308c2a"
config-ref="File_Config"
path="C:\emp_large.unl"
outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
<non-repeatable-stream />
</file:read>
</until-successful>
<logger level="INFO" doc:name="Logger" doc:id="438f9b64-8d66-4999-ac72-cdd7ade3cd0f" category="send.empData" message="Co-relationId : #[correlationId] After successfully Reading file "/>
<try doc:name="Try" doc:id="c934788a-28db-4e49-a7cd-ee8eaff026ae" >
<choice doc:name="is the file empty ?" doc:id="69c57e57-354c-4a01-9810-35fc4228d5d9">
<when expression="attributes.size == 0">
<logger level="ERROR" doc:name="Payload is empty " doc:id="08decd27-c8dc-41e7-8c1d-f395f405b248" message="Co-relationId : #[correlationId] Empty payload from file!" category="send.empData" />
<raise-error doc:name="Raise error on empty file " doc:id="ad6e2e62-09f2-4e4a-bc2c-99d7ce2c95c8" type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File either empty or not received or error in reading file content" />
</when>
<otherwise>
<logger level="INFO" doc:name="Payload not empty" doc:id="606550aa-c59e-4ff4-ad15-026a8616845a" message="Co-relationId : #[correlationId] File not empty and contains #[sizeOf(payload)] rows" category="send.empData" />
</otherwise>
</choice>
<set-payload value='#[output application/json deferred=true
---
{"clientId": "abcd",
"employees": payload map (value,index)->{
"id": value.column_0,
"name": value.column_1
}
}]' doc:name="Set Payload" doc:id="5dba41e5-df65-42e3-8899-0c4abf8f8c16" />
<http:request method="POST" doc:name="Submit products to XYZ" doc:id="575d1c2f-d20f-4174-b7e6-ad0074ea7eb9" config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="AUTO" sendBodyMode="AUTO">
<non-repeatable-stream />
<http:headers ><![CDATA[#[output application/java
---
{
"Host" : p('http_emp.host'),
"Content-Type" : "application/json"
}]]]></http:headers>
<http:response-validator >
<http:success-status-code-validator values="200" />
</http:response-validator>
</http:request>
<logger level="INFO" doc:name="Logger" doc:id="d3988977-bdda-4eac-bf88-508201582a78" category="send.empData" message="Co-relationId : #[correlationId] Successfully published product data to XYZ , response received is : #[payload]"/>
<error-handler >
<on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue - Move emp file from processing to error folder" doc:id="a02560ca-77fb-443f-88b0-fcc27185ea7c" >
<logger level="ERROR" doc:name="Failure log" doc:id="c4a10961-d853-4fc4-87d7-9b5c55750a7c" message="Co-relationId : #[correlationId] error encountered after reading file , caused by : #[error.detailedDescription]" category="send.empData"/>
<until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="d8b3e71a-02fc-4c19-be7f-3851288b85a0" millisBetweenRetries="${sftp.retry.frequency}">
<sftp:move doc:name="Move file to error" doc:id="22b353e3-ec17-4df5-aa20-4055e0a6cf3b" config-ref="SFTP_Config" sourcePath="#[p('sftp.outputProcessingDir') ++ '/' ++ vars.fileName]" targetPath="${sftp.outputErrorDir}" createParentDirectories="false" overwrite="true"/>
</until-successful>
<logger level="INFO" doc:name="Logger" doc:id="dcc9dd6b-0734-486d-b0c1-a9fc3fe64348" category="send.empData" message="Co-relationId : #[correlationId] successfully moved file to error folder"/>
</on-error-continue>
</error-handler>
</try>
<logger level="INFO" doc:name="Logger" doc:id="20453f8a-45ad-4e23-98d5-a03a66509b5d" message="Co-relationId : #[correlationId] processing emp data END:" category="send.empData"/>
<error-handler >
<on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue" doc:id="3fa169c0-92ad-431b-9440-a21448db4bce" type="RETRY_EXHAUSTED">
<logger level="ERROR" doc:name="Logger" doc:id="bf68b097-9b02-4770-b0f3-9972bc91a97f" message="Co-relationId : #[correlationId] while processing emp file Error is : #[error.suppressedErrors[0].errorType] caused by #[error.suppressedErrors[0].detailedDescription]" category="send.empData"/>
</on-error-continue>
</error-handler>
</flow>
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这是使用
非重复流
的另一个缺点。之所以发生,是因为您放置的断点将消耗inputStream
,并且您只能消耗一次输入流。因此,您的set-payload
在执行时将接收一个空的有效载荷。话虽如此,我相信调试器应该足够聪明,可以检测到有效载荷是不可重复的流而不是消耗的流。尝试禁用断点,如果正确设置了其他所有内容,则应将请求发送到HTTP端点。如果您想查看有效负载,可以尝试在
set-Payload
之后记录几行,然后删除记录器。 (我知道这听起来并不吸引人,但是目前这是我当前版本的Anypoint Studio的唯一选择。)编辑:您发送空数组的另一个原因是因为您在记录
sizeof(有效载荷)
时,正在消耗输入流。因此,您的set-payload
正在获取一个空的流以消费。对于您的问题。
这基本上使用
。因此,该请求是通过一个请求发送的,但是由于在发送reqeust之前未生成有效载荷,因此在请求完成之前将其发送为trassing> transfer-incoding:bunked
chunks
。That is another downside of using
non repeatable stream
. It is happening because the breakpoint that you put will consume theInputStream
and you can only consume an input stream once. Therefore yourset-payload
will receive an empty payload when it executes. Having said that, I believe that the debugger should be smart enough to detect that payload is non repeatable stream and not consume it.Try disabling the breakpoint, if everything else is set correctly, it should send the request to the HTTP Endpoint. If you want to see the payload you can try logging a few rows after the
set-payload
once, then remove the logger. (I know it does not sounds appealing, but currently it is the only option I can think off with the current version of Anypoint studio.)EDIT: Another reason that you are sending an empty Array is because you are consuming the InputStream while logging the
sizeOf(payload)
. Therefore yourset-payload
is getting an empty Stream to consume.For your question.
This basically uses
Transfer-Encoding: chunked
. So the request is sent over a single request, but since the payload is not generated before sending the reqeust, it is sent inchunks
until the request is finished.