ConnectionIO 事务中的并行操作

发布于 2025-01-20 18:50:38 字数 593 浏览 4 评论 0原文

因此,我有一个程序,其中我从数据库中获取文件路径列表,在文件系统上删除这些文件,最后从数据库中删除文件路径。我将所有操作都放入交易中,以确保将路径从数据库中删除,如果FF在文件系统中删除了所有文件。

不幸的是

val result = for {
deletePath <- (fr""" select path from files""").query[String].stream //Stream[doobie.ConnectionIO,String]
_ <- Stream.eval(AsyncConnectionIO.liftIO(File(deletePath).delete()) //Stream[doobie.ConnectionIO,Unit]
_ <- Stream.eval(sql"delete from files where path = ${deletePath}".withUniqueGeneratedKeys) 
}

result.compile.drain.transact(transactor)

,该文件系统分布得出,这意味着单个操作很慢,但它允许一次多个操作。

所以我的问题是,如何在此处平行化文件系统删除操作?

So I have a program in which I get a list of file paths from a database, delete those files on the filesystem and finally delete the file paths from the database. I put all operations inside a transaction to ensure that the paths would be deleted from the database iff all of the files are deleted in the filesystem.

Something like this

val result = for {
deletePath <- (fr""" select path from files""").query[String].stream //Stream[doobie.ConnectionIO,String]
_ <- Stream.eval(AsyncConnectionIO.liftIO(File(deletePath).delete()) //Stream[doobie.ConnectionIO,Unit]
_ <- Stream.eval(sql"delete from files where path = ${deletePath}".withUniqueGeneratedKeys) 
}

result.compile.drain.transact(transactor)

Unfortunately, the file system is distributed which means individual operation is slow but it allows multiple operations at once.

So my question is, how do I parallelize the filesystem deletion operation here?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

醉城メ夜风 2025-01-27 18:50:38

是的,你可以。只需使用适当的组合符而不是 for 语法即可。

val result =
  (fr""" select path from files""")
    .query[String]
    .stream
    .parEvalMapUnordered(maxConcurrent = 64) { deletePath =>
      AsyncConnectionIO.liftIO(File(deletePath).delete()) >>
      sql"delete from files where path = ${deletePath}".withUniqueGeneratedKeys
    }

result.compile.drain.transact(transactor)

请记住将 maxConcurrent 参数更改为对您的用例有意义的参数。


(我无法测试代码,因此可能有一些拼写错误)

Yeah, you can. Just use appropriate combinators instead of the for syntax.

val result =
  (fr""" select path from files""")
    .query[String]
    .stream
    .parEvalMapUnordered(maxConcurrent = 64) { deletePath =>
      AsyncConnectionIO.liftIO(File(deletePath).delete()) >>
      sql"delete from files where path = ${deletePath}".withUniqueGeneratedKeys
    }

result.compile.drain.transact(transactor)

Remember to change the maxConcurrent parameter to something that makes sense for your use case.


(I couldn't test the code so it may have some typos)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文