要求在 Ruffus 管道中运行函数之前创建一组文件

发布于 2024-08-25 08:44:40 字数 536 浏览 5 评论 0原文

我正在使用 ruffus 编写管道。我有一个被并行调用多次的函数,它创建了多个文件。我想创建一个函数“combineFiles()”,在创建所有这些文件后调用该函数。由于它们在集群上并行运行,因此它们不会一起完成。我编写了一个函数“getFilenames()”,它返回需要创建的文件名集,但是如何使combineFiles()等待它们在那里呢?

我尝试了以下操作:

@pipelineFunction
@files(getFilenames)
def combineFiles(filenames):
  # I should only be called if every file in the list 'filenames' exists

我也尝试了装饰器:

@merge(getFilenames)

但这也不起作用。在创建 getFilenames 给出的文件之前,combineFiles 仍然会被错误地调用。我怎样才能使combineFiles以这些文件存在为条件?

谢谢。

I'm using ruffus to write a pipeline. I have a function that gets called in parallel many times and it creates several files. I'd like to make a function "combineFiles()" that gets called after all those files have been made. Since they run in parallel on a cluster, they will not all finish together. I wrote a function 'getFilenames()' that returns the set of filenames that need to be created, but how can I make combineFiles() wait for them to be there?

I tried the following:

@pipelineFunction
@files(getFilenames)
def combineFiles(filenames):
  # I should only be called if every file in the list 'filenames' exists

I've also tried the decorator:

@merge(getFilenames)

but this does not work either. combineFiles still gets errorneously called before the files given by getFilenames are made. How can I make combineFiles conditional on those files being there?

thanks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

冷情妓 2024-09-01 08:44:45

我是 Ruffus 的开发者。我不确定我完全理解你想要做什么,但这里是:

等待需要不同时间完成的作业才能运行管道的下一阶段正是 Ruffus 的目的,所以这希望很简单。

第一个问题是您是否知道预先创建哪些文件,即在管道运行之前?让我们首先假设您这样做。

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

让我们编写一个虚拟函数,每次调用它时都会创建一个文件。在 Ruffus 中,任何输入和输出文件名分别包含在前两个参数中。我们没有输入文件名,因此我们的函数调用应如下所示:

create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")

create_file 的定义如下所示:

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    open(output_file_name, "w").write("dummy file")

这些文件中的每一个都将在对 create_file 的 3 次单独调用中创建。如果您愿意,这些可以并行运行。

pipeline_run([create_file], multiprocess = 5)

现在合并文件。 “@Merge”装饰器确实就是为此而设置的。我们只需将它链接到前面的函数:

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())

只有当所有文件从对 create_file() 的三个调用中准备就绪时,才会调用 merge_file。

整个代码如下:

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

from random import randint
from time import sleep

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    # simulate create file process of indeterminate complexity
    sleep(randint(1,5))
    open(output_file_name, "w").write("dummy file")

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())


pipeline_run([merge_file], multiprocess = 5)

这是结果:

>>> pipeline_run([merge_file], multiprocess = 5)

    Job = [None -> two.file] completed
    Job = [None -> three.file] completed
    Job = [None -> one.file] completed
Completed Task = create_file
    Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file

I am the developer of Ruffus. I am not sure I entirely understand what you are trying to do but here goes:

Waiting for jobs which take a different amount of time to finish in order to run the next stage of your pipeline is exactly what Ruffus is about so this hopefully is straightforward.

The first question is do you know which files are being created up front, i.e. before the pipeline is run? Lets start by assuming you do.

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

Let us write a dummy function which creates a file each time it is called. In Ruffus, any input and output file names are contained in the first two parameters respectively. We have no input file name, so our function calls should look like this:

create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")

The definition of create_file would look like this:

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    open(output_file_name, "w").write("dummy file")

Each of these files would be created in 3 separate calls to create_file. These can be run in parallel if you wish.

pipeline_run([create_file], multiprocess = 5)

Now to combine the files. The "@Merge" decorator is indeed set up precisely for this. We just need to link it up to the previous function :

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())

This will only call merge_file when all the files are ready from the three calls to create_file().

The entire code is as follows:

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

from random import randint
from time import sleep

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    # simulate create file process of indeterminate complexity
    sleep(randint(1,5))
    open(output_file_name, "w").write("dummy file")

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())


pipeline_run([merge_file], multiprocess = 5)

And this is the result:

>>> pipeline_run([merge_file], multiprocess = 5)

    Job = [None -> two.file] completed
    Job = [None -> three.file] completed
    Job = [None -> one.file] completed
Completed Task = create_file
    Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文