要求在 Ruffus 管道中运行函数之前创建一组文件
我正在使用 ruffus 编写管道。我有一个被并行调用多次的函数,它创建了多个文件。我想创建一个函数“combineFiles()”,在创建所有这些文件后调用该函数。由于它们在集群上并行运行,因此它们不会一起完成。我编写了一个函数“getFilenames()”,它返回需要创建的文件名集,但是如何使combineFiles()等待它们在那里呢?
我尝试了以下操作:
@pipelineFunction
@files(getFilenames)
def combineFiles(filenames):
# I should only be called if every file in the list 'filenames' exists
我也尝试了装饰器:
@merge(getFilenames)
但这也不起作用。在创建 getFilenames 给出的文件之前,combineFiles 仍然会被错误地调用。我怎样才能使combineFiles以这些文件存在为条件?
谢谢。
I'm using ruffus to write a pipeline. I have a function that gets called in parallel many times and it creates several files. I'd like to make a function "combineFiles()" that gets called after all those files have been made. Since they run in parallel on a cluster, they will not all finish together. I wrote a function 'getFilenames()' that returns the set of filenames that need to be created, but how can I make combineFiles() wait for them to be there?
I tried the following:
@pipelineFunction
@files(getFilenames)
def combineFiles(filenames):
# I should only be called if every file in the list 'filenames' exists
I've also tried the decorator:
@merge(getFilenames)
but this does not work either. combineFiles still gets errorneously called before the files given by getFilenames are made. How can I make combineFiles conditional on those files being there?
thanks.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我是 Ruffus 的开发者。我不确定我完全理解你想要做什么,但这里是:
等待需要不同时间完成的作业才能运行管道的下一阶段正是 Ruffus 的目的,所以这希望很简单。
第一个问题是您是否知道预先创建哪些文件,即在管道运行之前?让我们首先假设您这样做。
让我们编写一个虚拟函数,每次调用它时都会创建一个文件。在 Ruffus 中,任何输入和输出文件名分别包含在前两个参数中。我们没有输入文件名,因此我们的函数调用应如下所示:
create_file 的定义如下所示:
这些文件中的每一个都将在对 create_file 的 3 次单独调用中创建。如果您愿意,这些可以并行运行。
现在合并文件。 “@Merge”装饰器确实就是为此而设置的。我们只需将它链接到前面的函数:
只有当所有文件从对 create_file() 的三个调用中准备就绪时,才会调用 merge_file。
整个代码如下:
这是结果:
I am the developer of Ruffus. I am not sure I entirely understand what you are trying to do but here goes:
Waiting for jobs which take a different amount of time to finish in order to run the next stage of your pipeline is exactly what Ruffus is about so this hopefully is straightforward.
The first question is do you know which files are being created up front, i.e. before the pipeline is run? Lets start by assuming you do.
Let us write a dummy function which creates a file each time it is called. In Ruffus, any input and output file names are contained in the first two parameters respectively. We have no input file name, so our function calls should look like this:
The definition of create_file would look like this:
Each of these files would be created in 3 separate calls to create_file. These can be run in parallel if you wish.
Now to combine the files. The "@Merge" decorator is indeed set up precisely for this. We just need to link it up to the previous function :
This will only call merge_file when all the files are ready from the three calls to create_file().
The entire code is as follows:
And this is the result: