Clojure/Java:在对 Amazon S3 数据流执行复杂操作时最小化带宽消耗的最有效方法

发布于 2024-09-16 08:15:05 字数 712 浏览 2 评论 0原文

我正在使用 BufferedReader 执行对象的流读取。

我需要对这个对象做两件事:

  1. 将其传递给 SuperCSV csv 阅读器
  2. 获取原始行并将它们保留在(Clojure)延迟序列中

目前,我必须使用两个不同的 BufferedReader:一个作为 SuperCSV CSV 的参数reader 类和一个用于初始化原始行的惰性序列的类。我实际上下载了 S3 对象两次,这是昂贵的 ($) 并且速度慢。

我的一位同事指出,我正在寻找类似于 Unix“tee”命令的东西。可以以某种方式“拆分”、下载一大块数据并将副本传递给惰性序列和 csv 读取器功能的 BufferedReader 将很有用。

我目前还在研究是否可以将惰性序列包装在 BufferedReader 中并将传递给超级 csv。在将非常大的惰性序列传递给多个使用者时,我遇到了一些 Java 堆空间问题,因此我对采用此解决方案感到有点担心。

另一个解决方案是在本地下载文件,然后在该文件上打开两个流。这消除了流式传输背后的原始动机:允许在数据开始到达时立即开始处理文件。

最后的解决方案是实现我自己的 CSV 阅读器,它返回已解析的 CSV 和原始未解析的行,也是只有在其他方法都不起作用的情况下我才会考虑的解决方案。如果您使用过非常可靠的 CSV 阅读器,它可以返回已解析的 CSV 数据的 Java 哈希值和原始未解析的行,请告诉我!

谢谢!

I'm performing streaming reads of an object using BufferedReader.

I need to do two things with this object:

  1. Pass it to a SuperCSV csv reader
  2. Obtain the raw lines and keep them in a (Clojure) lazy sequence

Currently, I am having to use two different BufferedReaders: one as an argument to a SuperCSV CSV reader class and one to initialize the lazy sequence of raw lines. I'm effectively downloading the S3 object twice, which is expensive ($) and slow.

One of my colleagues pointed out that something analogous to a Unix "tee" command is what I'm looking for. A BufferedReader that could somehow be "split", download a chunk of data, and pass a copy to both the lazy sequence and csv reader functionality would be useful.

I'm also currently investigating whether it would be possible to wrap the lazy sequence in a BufferedReader and pass that to super csv. I've had some Java heap space issues when passing very large lazy sequences to multiple consumers, so I'm kind of worried about employing this solution.

Another solution is just downloading the file locally and then opening two streams on this file. This eliminates the original motivation behind streaming: allowing processing of the file to begin as soon as data starts arriving.

The final solution, and one that I'd consider only if nothing else works, is implementing my own CSV reader that returns both parsed CSV and the original unparsed line. If you've used a very solid CSV reader that can return both a Java Hash of parsed CSV data and the original unparsed line, please let me know!

Thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

自由如风 2024-09-23 08:15:05

我倾向于从网络创建一系列行,然后将其交给需要处理该序列的许多进程;持久数据结构在这方面很酷。在需要将字符串序列转换为 Reader 并可以将其交给 SuperCSV api 的情况下,这似乎可行:

(import '[java.io Reader StringReader])

(defn concat-reader
  "Returns a Reader that reads from a sequence of strings."
  [lines]
  (let [srs (atom (map #(StringReader. %) lines))]
    (proxy [Reader] []
      (read 
        ([] 
          (let [c (.read (first @srs))]
            (if (and (neg? c) (swap! srs next))
              (.read this)
              c)))
        ([cbuf] 
          (.read this cbuf 0 (count cbuf)))
        ([cbuf off len]
          (let [actual (.read (first @srs) cbuf off len)]
            (if (and (neg? actual) (swap! srs next))
              (.read this cbuf off len)
              actual))))
      (close [] ))))

例如

user=> (def r (concat-reader ["foo" "bar"]))
#'user/r
user=> (def cbuf (char-array 2))
#'user/cbuf
user=> (.read r cbuf)
2
user=> (seq cbuf)
(\f \o)
user=> (char (.read r))
\o
user=> (char (.read r))
\b

I'd be inclined to go with creating a seq of lines from the network, and then hand that over to however many processes need to work on that seq; persistent data structures are cool that way. In the case of needing to turn a seq of strings into a Reader that you can hand off to the SuperCSV api, this seems to work:

(import '[java.io Reader StringReader])

(defn concat-reader
  "Returns a Reader that reads from a sequence of strings."
  [lines]
  (let [srs (atom (map #(StringReader. %) lines))]
    (proxy [Reader] []
      (read 
        ([] 
          (let [c (.read (first @srs))]
            (if (and (neg? c) (swap! srs next))
              (.read this)
              c)))
        ([cbuf] 
          (.read this cbuf 0 (count cbuf)))
        ([cbuf off len]
          (let [actual (.read (first @srs) cbuf off len)]
            (if (and (neg? actual) (swap! srs next))
              (.read this cbuf off len)
              actual))))
      (close [] ))))

E.g.

user=> (def r (concat-reader ["foo" "bar"]))
#'user/r
user=> (def cbuf (char-array 2))
#'user/cbuf
user=> (.read r cbuf)
2
user=> (seq cbuf)
(\f \o)
user=> (char (.read r))
\o
user=> (char (.read r))
\b
自演自醉 2024-09-23 08:15:05

解决方案是使用单个 BufferedReader 进行所有访问,然后在每次将其传递到需要从头开始读取的功能时重置它。

The solution was to use a single BufferedReader for all accesses and then reset()ing it every time it is passed into functionality that needs to read from the beginning.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文