这个问题很简单,但令我惊讶的是,当我搜索它时,它并没有立即弹出。
我有一个 CSV 文件,可能非常大,需要处理。每行都应该交给处理器,直到处理完所有行。为了读取 CSV 文件,我将使用 OpenCSV,它本质上提供了一个 readNext() 方法,该方法为我提供了下一行。如果没有更多行可用,则所有处理器都应终止。
为此,我创建了一个非常简单的 groovy 脚本,定义了一个同步 readNext() 方法(因为读取下一行并不真正耗时),然后创建了几个读取下一行并处理它的线程。它工作正常,但是......
难道不应该有一个我可以使用的内置解决方案吗?这不是 gpars 集合处理,因为它始终假设内存中存在现有集合。相反,我无法将其全部读入内存然后处理它,这会导致内存不足异常。
那么......有人有一个很好的模板来使用几个工作线程“逐行”处理 CSV 文件吗?
The question is a simple one and I am surprised it did not pop up immediately when I searched for it.
I have a CSV file, a potentially really large one, that needs to be processed. Each line should be handed to a processor until all rows are processed. For reading the CSV file, I'll be using OpenCSV which essentially provides a readNext() method which gives me the next row. If no more rows are available, all processors should terminate.
For this I created a really simple groovy script, defined a synchronous readNext() method (as the reading of the next line is not really time consuming) and then created a few threads that read the next line and process it. It works fine, but...
Shouldn't there be a built-in solution that I could just use? It's not the gpars collection processing, because that always assumes there is an existing collection in memory. Instead, I cannot afford to read it all into memory and then process it, it would lead to outofmemory exceptions.
So.... anyone having a nice template for processing a CSV file "line by line" using a couple of worker threads?
发布评论
评论(3)
并发访问文件可能不是一个好主意,GPars 的 fork/join 处理仅适用于内存中的数据(集合)。我的建议是将文件按顺序读入列表中。当列表达到一定大小时,使用 GPar 同时处理列表中的条目,清除列表,然后继续读取行。
Concurrently accessing a file might not be a good idea and GPars' fork/join-processing is only meant for in-memory data (collections). My sugesstion would be to read the file sequentially into a list. When the list reaches a certain size, process the entries in the list concurrently using GPars, clear the list and then move on with reading lines.
对于演员来说,这可能是一个好问题。同步读取器参与者可以将 CSV 行移交给并行处理器参与者。例如:
This might be a good problem for actors. A synchronous reader actor could hand off CSV lines to parallel processor actors. For example:
我只是在 Grails 中总结了类似问题的实现(您没有指定是否使用 grails、普通 hibernate、普通 JDBC 还是其他东西)。
据我所知,没有任何开箱即用的东西。您可以考虑与 Spring Batch 集成,但上次我看它时,它对我来说感觉非常沉重(而且不是很时髦)。
如果您使用普通 JDBC,那么执行 Christoph 建议的操作可能是最简单的事情(读取 N 行并使用 GPar 同时旋转这些行)。
如果您使用 grails 或 hibernate,并且希望您的工作线程能够访问 spring 上下文以进行依赖项注入,那么事情会变得更加复杂。
我解决这个问题的方法是使用 Grails Redis 插件(免责声明:我是作者)和 < a href="http://grails.org/plugin/jesque" rel="nofollow">Jesque 插件,它是 Resque。
Jesque 插件允许您创建“Job”类,该类具有带有任意参数的“process”方法,用于处理 Jesque 队列上排队的工作。您可以根据需要启动任意数量的工作人员。
我有一个文件上传,管理员用户可以将文件发布到其中,它将文件保存到磁盘并将作业排队到我创建的 ProducerJob 中。 ProducerJob 旋转文件,对于每一行,它都会将一条消息排入队列以供 ConsumerJob 获取。该消息只是从 CSV 文件中读取的值的映射。
ConsumerJob 获取这些值并为其行创建适当的域对象并将其保存到数据库中。
我们已经在生产中使用 Redis,因此将其用作排队机制是有意义的。我们有一个旧的同步加载,它串行地运行文件加载。我目前正在使用 1 个生产者工作线程和 4 个消费者工作线程,这种方式加载的速度比旧加载快 100 倍以上(向最终用户提供更好的进度反馈)。
我同意最初的问题,即可能有空间将类似的东西打包,因为这是相对常见的事情。
更新:我提出 一篇博客文章,其中包含使用 Redis + Jesque 进行导入的简单示例。
I'm just wrapping up an implementation of a problem just like this in Grails (you don't specify if you're using grails, plain hibernate, plain JDBC or something else).
There isn't anything out of the box that you can get that I'm aware of. You could look at integrating with Spring Batch, but the last time I looked at it, it felt very heavy to me (and not very groovy).
If you're using plain JDBC, doing what Christoph recommends probably is the easiest thing to do (read in N rows and use GPars to spin through those rows concurrently).
If you're using grails, or hibernate, and want your worker threads to have access to the spring context for dependency injection, things get a bit more complicated.
The way I solved it is using the Grails Redis plugin (disclaimer: I'm the author) and the Jesque plugin, which is a java implementation of Resque.
The Jesque plugin lets you create "Job" classes that have a "process" method with arbitrary parameters that are used to process work enqueued on a Jesque queue. You can spin up as many workers as you want.
I have a file upload that an admin user can post a file to, it saves the file to disk and enqueues a job for the ProducerJob that I've created. That ProducerJob spins through the file, for each line, it enqueues a message for a ConsumerJob to pick up. The message is simply a map of the values read from the CSV file.
The ConsumerJob takes those values and creates the appropriate domain object for it's line and saves it to the database.
We already were using Redis in production so using this as a queueing mechanism made sense. We had an old synchronous load that ran through file loads serially. I'm currently using one producer worker and 4 consumer workers and loading things this way is over 100x faster than the old load was (with much better progress feedback to the end user).
I agree with the original question that there is probably room for something like this to be packaged up as this is a relatively common thing.
UPDATE: I put up a blog post with a simple example doing imports with Redis + Jesque.