使用不可重复的'读取和解析大型CSV文件溪流

发布于 2025-01-30 13:03:24 字数 10297 浏览 4 评论 0原文

感谢Harshank Bansal,Ryan Hoegg和Aled对较早问题的建议和评论在这里

使用mule 4.4社区版和前提。

因此,基于注释,AM读取大型CSV文件,为非重复流

文件是一个CSV文件,但管道分开:

101|John Saunders|19|M|Physics|Chemistry|Mechanics
102|Jim White|17|M|Languages|Art|Pottery
...
...

一旦阅读,请使用attributes.size == 0,将其读取到AM AM检查空文件。 现在,此时使用设置有效载荷使用延期= true

在此组件之后,在调试模式下,有效载荷显示为java.io.pipedinputstream(这很好,这只是一个观察值)

现在将此有效载荷传递给http request调用休息端点的组件。目前已经嘲笑了这个休息端,并记录了此休息端点收到的有效载荷。

但是,此休息端点收到的有效负载是一个空数组,

我需要在HTTP请求上设置任何属性吗?我可以看到诸如请求流模式之类的属性,我们需要在那里配置任何东西吗?

另一个疑问是:预计将被调用多少次剩余服务?它会被多次调用还是仅一次请求?

这是我尝试过的完整代码。

    <flow name="get:employee" >
    
        
        <set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
        
            <file:read doc:name="Read emp file"  
                config-ref="File_Config" 
                path="/emp_large.unl" 
                outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
                <non-repeatable-stream />
            </file:read>
        
        <try doc:name="Try" >
                <choice doc:name="is the file empty ?" >
            <when expression="attributes.size == 0">
                <logger level="ERROR" doc:name="Payload is empty " message="Co-relationId  : #[correlationId]  Empty payload from file: #[vars.fileName] !" category="load.empData" />
                <raise-error doc:name="Raise error on empty file "  type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File  either empty or not received or error in reading file content" />
            </when>
            <otherwise>
                
            </otherwise>
        </choice>
            <set-payload value='#[output application/json deferred=true&#10;&#10;&#10;&#10;---&#10;&#10;&#10;payload map (value,index)-&gt;{&#10;    "id": value.column_0,&#10;    "name": value.column_1&#10;}]' doc:name="Set Payload"  />
            <http:request method="POST" doc:name="Submit products to Rest API"  config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="NEVER">
                <non-repeatable-stream />
                <http:headers ><![CDATA[#[output application/java
---
{
    
    "Host" : p('http_emp.host'),
    "Content-Type" : "application/json"
}]]]></http:headers>
                <http:response-validator >
                    <http:success-status-code-validator values="200" />
                </http:response-validator>
            </http:request>
                <logger level="INFO" doc:name="Logger" category="load.empData" message="Co-relationId  : #[correlationId]  Successfully published emp data to XYZ , response received is : #[payload]"/>
            <error-handler >
                        <logger level="ERROR" doc:name="Failure log" message="Co-relationId  : #[correlationId] error encountered after reading  file #[vars.fileName]  , caused by : #[error.detailedDescription]" category="load.empData"/>
                    
                    </on-error-continue>
                </error-handler>
            </try>
        <logger level="INFO" doc:name="Logger"  message="Co-relationId  : #[correlationId]  processing emp data END:" category="load.empData"/>
        
    </flow>

附加流的丝网印刷以更容易可视化:

edit1 :粘贴完整的代码下面:

<flow name="get:employee" doc:id="feffbaae-2873-4248-a043-d51697083b75">
    <logger level="INFO" doc:name="Logger" doc:id="10c9e0bb-7f18-42b8-9378-1225cb546641" message="Co-relationId  : #[correlationId]  processing emp data START:" category="send.empData"/>
        
        <set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
        
        <logger level="INFO" doc:name="Logger" doc:id="0f182868-404c-491e-acab-88832b73d73e" category="send.empData" message="Co-relationId  : #[correlationId]  After successfully moving file "/>
        <until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="cec72a48-72b7-436d-a024-fb3986fb3432" millisBetweenRetries="${sftp.retry.frequency}">
            <file:read doc:name="Read emp file" doc:id="e77633d5-5f4f-43a9-862b-9d6076308c2a" 
                config-ref="File_Config" 
                path="C:\emp_large.unl" 
                outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
                <non-repeatable-stream />
            </file:read>
        </until-successful>
        <logger level="INFO" doc:name="Logger" doc:id="438f9b64-8d66-4999-ac72-cdd7ade3cd0f" category="send.empData" message="Co-relationId  : #[correlationId]  After successfully Reading file "/>
        <try doc:name="Try" doc:id="c934788a-28db-4e49-a7cd-ee8eaff026ae" >
                <choice doc:name="is the file empty ?" doc:id="69c57e57-354c-4a01-9810-35fc4228d5d9">
            <when expression="attributes.size == 0">
                <logger level="ERROR" doc:name="Payload is empty " doc:id="08decd27-c8dc-41e7-8c1d-f395f405b248" message="Co-relationId  : #[correlationId]  Empty payload from file!" category="send.empData" />
                <raise-error doc:name="Raise error on empty file " doc:id="ad6e2e62-09f2-4e4a-bc2c-99d7ce2c95c8" type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File  either empty or not received or error in reading file content" />
            </when>
            <otherwise>
                <logger level="INFO" doc:name="Payload not empty" doc:id="606550aa-c59e-4ff4-ad15-026a8616845a" message="Co-relationId  : #[correlationId]  File not empty and contains #[sizeOf(payload)] rows" category="send.empData" />
            </otherwise>
        </choice>
            <set-payload value='#[output application/json deferred=true&#10;&#10;&#10;&#10;---&#10;&#10;&#10;&#10;  {"clientId": "abcd",&#10;"employees": payload map (value,index)-&gt;{&#10;    "id": value.column_0,&#10;    "name": value.column_1&#10;}&#10;}]' doc:name="Set Payload" doc:id="5dba41e5-df65-42e3-8899-0c4abf8f8c16" />
            <http:request method="POST" doc:name="Submit products to XYZ" doc:id="575d1c2f-d20f-4174-b7e6-ad0074ea7eb9" config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="AUTO" sendBodyMode="AUTO">
                <non-repeatable-stream />
                <http:headers ><![CDATA[#[output application/java
---
{
    
    "Host" : p('http_emp.host'),
    "Content-Type" : "application/json"
}]]]></http:headers>
                <http:response-validator >
                    <http:success-status-code-validator values="200" />
                </http:response-validator>
            </http:request>
                <logger level="INFO" doc:name="Logger" doc:id="d3988977-bdda-4eac-bf88-508201582a78" category="send.empData" message="Co-relationId  : #[correlationId]  Successfully published product data to XYZ , response received is : #[payload]"/>
            <error-handler >
                    <on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue - Move emp file from processing to error folder" doc:id="a02560ca-77fb-443f-88b0-fcc27185ea7c" >
                        <logger level="ERROR" doc:name="Failure log" doc:id="c4a10961-d853-4fc4-87d7-9b5c55750a7c" message="Co-relationId  : #[correlationId] error encountered after reading  file  , caused by : #[error.detailedDescription]" category="send.empData"/>
                    <until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="d8b3e71a-02fc-4c19-be7f-3851288b85a0" millisBetweenRetries="${sftp.retry.frequency}">
                        <sftp:move doc:name="Move file to error" doc:id="22b353e3-ec17-4df5-aa20-4055e0a6cf3b" config-ref="SFTP_Config" sourcePath="#[p('sftp.outputProcessingDir') ++ '/' ++ vars.fileName]" targetPath="${sftp.outputErrorDir}" createParentDirectories="false" overwrite="true"/>
                    </until-successful>
                    <logger level="INFO" doc:name="Logger" doc:id="dcc9dd6b-0734-486d-b0c1-a9fc3fe64348" category="send.empData" message="Co-relationId  : #[correlationId]  successfully moved  file  to error folder"/>
                    </on-error-continue>
                </error-handler>
            </try>
        <logger level="INFO" doc:name="Logger" doc:id="20453f8a-45ad-4e23-98d5-a03a66509b5d" message="Co-relationId  : #[correlationId]  processing emp data END:" category="send.empData"/>
        <error-handler >
            <on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue" doc:id="3fa169c0-92ad-431b-9440-a21448db4bce" type="RETRY_EXHAUSTED">
                <logger level="ERROR" doc:name="Logger" doc:id="bf68b097-9b02-4770-b0f3-9972bc91a97f" message="Co-relationId  : #[correlationId] while processing emp file Error is : #[error.suppressedErrors[0].errorType] caused by #[error.suppressedErrors[0].detailedDescription]" category="send.empData"/>
            </on-error-continue>
        </error-handler>
    </flow>

Thanks to harshank bansal , Ryan Hoegg and aled for their suggestions and comments to an earlier question here

Using Mule 4.4 community edition and on premise.

So based on comments am reading the large CSV file as a non repeatable stream

File is a CSV file but pipe separated:

101|John Saunders|19|M|Physics|Chemistry|Mechanics
102|Jim White|17|M|Languages|Art|Pottery
...
...

Once it is read am checking for empty file using attributes.size == 0
Now at this point using Set Payload with deferred=true .

In debug mode after this component the payload shows up as java.io.PipedInputStream ( which is fine , this is simply an observation )

Now am passing this payload to the HTTP Request component which calls a REST endpoint . Presently have mocked this REST endpoint and am logging the payload received by this REST endpoint.

However the payload received by this REST endpoint is an empty array

Do I need to set any attribute on HTTP Request ? I can see properties such as Request Streaming mode , do we need to configure anything there ?

Another doubt is : how many times is the REST service expected to be called ? will it get called multiple times or only a single request ?

here is the complete code of what I have tried .

    <flow name="get:employee" >
    
        
        <set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
        
            <file:read doc:name="Read emp file"  
                config-ref="File_Config" 
                path="/emp_large.unl" 
                outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
                <non-repeatable-stream />
            </file:read>
        
        <try doc:name="Try" >
                <choice doc:name="is the file empty ?" >
            <when expression="attributes.size == 0">
                <logger level="ERROR" doc:name="Payload is empty " message="Co-relationId  : #[correlationId]  Empty payload from file: #[vars.fileName] !" category="load.empData" />
                <raise-error doc:name="Raise error on empty file "  type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File  either empty or not received or error in reading file content" />
            </when>
            <otherwise>
                
            </otherwise>
        </choice>
            <set-payload value='#[output application/json deferred=true



---


payload map (value,index)->{
    "id": value.column_0,
    "name": value.column_1
}]' doc:name="Set Payload"  />
            <http:request method="POST" doc:name="Submit products to Rest API"  config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="NEVER">
                <non-repeatable-stream />
                <http:headers ><![CDATA[#[output application/java
---
{
    
    "Host" : p('http_emp.host'),
    "Content-Type" : "application/json"
}]]]></http:headers>
                <http:response-validator >
                    <http:success-status-code-validator values="200" />
                </http:response-validator>
            </http:request>
                <logger level="INFO" doc:name="Logger" category="load.empData" message="Co-relationId  : #[correlationId]  Successfully published emp data to XYZ , response received is : #[payload]"/>
            <error-handler >
                        <logger level="ERROR" doc:name="Failure log" message="Co-relationId  : #[correlationId] error encountered after reading  file #[vars.fileName]  , caused by : #[error.detailedDescription]" category="load.empData"/>
                    
                    </on-error-continue>
                </error-handler>
            </try>
        <logger level="INFO" doc:name="Logger"  message="Co-relationId  : #[correlationId]  processing emp data END:" category="load.empData"/>
        
    </flow>

Attaching screen print of the flow for easier visualisation:
enter image description here

Edit1: Pasting complete code below :

<flow name="get:employee" doc:id="feffbaae-2873-4248-a043-d51697083b75">
    <logger level="INFO" doc:name="Logger" doc:id="10c9e0bb-7f18-42b8-9378-1225cb546641" message="Co-relationId  : #[correlationId]  processing emp data START:" category="send.empData"/>
        
        <set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
        
        <logger level="INFO" doc:name="Logger" doc:id="0f182868-404c-491e-acab-88832b73d73e" category="send.empData" message="Co-relationId  : #[correlationId]  After successfully moving file "/>
        <until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="cec72a48-72b7-436d-a024-fb3986fb3432" millisBetweenRetries="${sftp.retry.frequency}">
            <file:read doc:name="Read emp file" doc:id="e77633d5-5f4f-43a9-862b-9d6076308c2a" 
                config-ref="File_Config" 
                path="C:\emp_large.unl" 
                outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
                <non-repeatable-stream />
            </file:read>
        </until-successful>
        <logger level="INFO" doc:name="Logger" doc:id="438f9b64-8d66-4999-ac72-cdd7ade3cd0f" category="send.empData" message="Co-relationId  : #[correlationId]  After successfully Reading file "/>
        <try doc:name="Try" doc:id="c934788a-28db-4e49-a7cd-ee8eaff026ae" >
                <choice doc:name="is the file empty ?" doc:id="69c57e57-354c-4a01-9810-35fc4228d5d9">
            <when expression="attributes.size == 0">
                <logger level="ERROR" doc:name="Payload is empty " doc:id="08decd27-c8dc-41e7-8c1d-f395f405b248" message="Co-relationId  : #[correlationId]  Empty payload from file!" category="send.empData" />
                <raise-error doc:name="Raise error on empty file " doc:id="ad6e2e62-09f2-4e4a-bc2c-99d7ce2c95c8" type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File  either empty or not received or error in reading file content" />
            </when>
            <otherwise>
                <logger level="INFO" doc:name="Payload not empty" doc:id="606550aa-c59e-4ff4-ad15-026a8616845a" message="Co-relationId  : #[correlationId]  File not empty and contains #[sizeOf(payload)] rows" category="send.empData" />
            </otherwise>
        </choice>
            <set-payload value='#[output application/json deferred=true



---



  {"clientId": "abcd",
"employees": payload map (value,index)->{
    "id": value.column_0,
    "name": value.column_1
}
}]' doc:name="Set Payload" doc:id="5dba41e5-df65-42e3-8899-0c4abf8f8c16" />
            <http:request method="POST" doc:name="Submit products to XYZ" doc:id="575d1c2f-d20f-4174-b7e6-ad0074ea7eb9" config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="AUTO" sendBodyMode="AUTO">
                <non-repeatable-stream />
                <http:headers ><![CDATA[#[output application/java
---
{
    
    "Host" : p('http_emp.host'),
    "Content-Type" : "application/json"
}]]]></http:headers>
                <http:response-validator >
                    <http:success-status-code-validator values="200" />
                </http:response-validator>
            </http:request>
                <logger level="INFO" doc:name="Logger" doc:id="d3988977-bdda-4eac-bf88-508201582a78" category="send.empData" message="Co-relationId  : #[correlationId]  Successfully published product data to XYZ , response received is : #[payload]"/>
            <error-handler >
                    <on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue - Move emp file from processing to error folder" doc:id="a02560ca-77fb-443f-88b0-fcc27185ea7c" >
                        <logger level="ERROR" doc:name="Failure log" doc:id="c4a10961-d853-4fc4-87d7-9b5c55750a7c" message="Co-relationId  : #[correlationId] error encountered after reading  file  , caused by : #[error.detailedDescription]" category="send.empData"/>
                    <until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="d8b3e71a-02fc-4c19-be7f-3851288b85a0" millisBetweenRetries="${sftp.retry.frequency}">
                        <sftp:move doc:name="Move file to error" doc:id="22b353e3-ec17-4df5-aa20-4055e0a6cf3b" config-ref="SFTP_Config" sourcePath="#[p('sftp.outputProcessingDir') ++ '/' ++ vars.fileName]" targetPath="${sftp.outputErrorDir}" createParentDirectories="false" overwrite="true"/>
                    </until-successful>
                    <logger level="INFO" doc:name="Logger" doc:id="dcc9dd6b-0734-486d-b0c1-a9fc3fe64348" category="send.empData" message="Co-relationId  : #[correlationId]  successfully moved  file  to error folder"/>
                    </on-error-continue>
                </error-handler>
            </try>
        <logger level="INFO" doc:name="Logger" doc:id="20453f8a-45ad-4e23-98d5-a03a66509b5d" message="Co-relationId  : #[correlationId]  processing emp data END:" category="send.empData"/>
        <error-handler >
            <on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue" doc:id="3fa169c0-92ad-431b-9440-a21448db4bce" type="RETRY_EXHAUSTED">
                <logger level="ERROR" doc:name="Logger" doc:id="bf68b097-9b02-4770-b0f3-9972bc91a97f" message="Co-relationId  : #[correlationId] while processing emp file Error is : #[error.suppressedErrors[0].errorType] caused by #[error.suppressedErrors[0].detailedDescription]" category="send.empData"/>
            </on-error-continue>
        </error-handler>
    </flow>

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

橘和柠 2025-02-06 13:03:24

这是使用非重复流的另一个缺点。之所以发生,是因为您放置的断点将消耗inputStream,并且您只能消耗一次输入流。因此,您的set-payload在执行时将接收一个空的有效载荷。话虽如此,我相信调试器应该足够聪明,可以检测到有效载荷是不可重复的流而不是消耗的流。

尝试禁用断点,如果正确设置了其他所有内容,则应将请求发送到HTTP端点。如果您想查看有效负载,可以尝试在set-Payload之后记录几行,然后删除记录器。 (我知道这听起来并不吸引人,但是目前这是我当前版本的Anypoint Studio的唯一选择。)

编辑:您发送空数组的另一个原因是因为您在记录sizeof(有效载荷)时,正在消耗输入流。因此,您的set-payload正在获取一个空的流以消费。

对于您的问题。

预计将被调用多少次剩余服务?它会被多次打电话给单个请求吗?

这基本上使用 trassing> transfer-incoding:bunked 。因此,该请求是通过一个请求发送的,但是由于在发送reqeust之前未生成有效载荷,因此在请求完成之前将其发送为chunks

That is another downside of using non repeatable stream. It is happening because the breakpoint that you put will consume the InputStream and you can only consume an input stream once. Therefore your set-payload will receive an empty payload when it executes. Having said that, I believe that the debugger should be smart enough to detect that payload is non repeatable stream and not consume it.

Try disabling the breakpoint, if everything else is set correctly, it should send the request to the HTTP Endpoint. If you want to see the payload you can try logging a few rows after the set-payload once, then remove the logger. (I know it does not sounds appealing, but currently it is the only option I can think off with the current version of Anypoint studio.)

EDIT: Another reason that you are sending an empty Array is because you are consuming the InputStream while logging the sizeOf(payload). Therefore your set-payload is getting an empty Stream to consume.

For your question.

how many times is the REST service expected to be called ? will it get called multiple times or only a single request ?

This basically uses Transfer-Encoding: chunked. So the request is sent over a single request, but since the payload is not generated before sending the reqeust, it is sent in chunks until the request is finished.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文