使用非阻塞 IO 对文件收集和聚合结果报告进行操作

发布于 2024-10-19 11:35:58 字数 1147 浏览 5 评论 0原文

我想对任意大的文件集执行一些任意昂贵的工作。我想实时报告进度,然后在处理完所有文件后显示结果。如果没有与我的表达式匹配的文件,我想抛出一个错误。

想象一下编写一个测试框架,加载所有测试文件,执行它们(没有特定的顺序),实时报告进度,然后在所有测试完成后显示汇总结果。

使用阻塞语言(例如 Ruby)编写此代码非常简单。

事实证明,我在节点中执行这个看似简单的任务时遇到了麻烦,同时也真正利用了异步、基于事件的 IO。

我的第一个设计是串行执行每个步骤。

  1. 加载所有文件,创建要处理的文件集合 处理
  2. 集合中的每个文件
  3. 处理所有文件后报告结果

这种方法确实有效,但对我来说似乎不太正确,因为它会导致计算成本更高我的程序的一部分等待所有文件 IO 完成。 Node 的设计目的不就是要避免这种等待吗?

我的第二个设计是处理在磁盘上异步找到的每个文件。为了便于讨论,让我们想象一个看起来像这样的方法:

eachFileMatching(path, expression, callback) {
  // recursively, asynchronously traverse the file system,
  // calling callback every time a file name matches expression.
}

该方法的使用者看起来像这样:

eachFileMatching('test/', /_test.js/, function(err, testFile) {
  // read and process the content of testFile
});

虽然这种设计感觉像是一种非常“节点”的 IO 工作方式,但它存在两个主要问题(至少在我可能错误的实现中):

  1. 我不知道所有文件何时已处理完毕,所以我不知道何时组装和发布结果。
  2. 由于文件读取是非阻塞且递归的,因此我正在努力了解如何知道是否未找到文件。

我希望我只是做错了什么,并且其他人可以使用一些相当简单的策略来使第二种方法发挥作用。

尽管这个示例使用了测试框架,但我还有许多其他项目遇到了同样的问题,并且我想任何编写一个相当复杂的应用程序来访问节点中的文件系统的人也会遇到这种情况。

I would like to perform some arbitrarily expensive work on an arbitrarily large set of files. I would like to report progress in real-time and then display results after all files have been processed. If there are no files that match my expression, I'd like to to throw an error.

Imagine writing a test framework that loads up all of your test files, executes them (in no particular order), reports on progress in real-time, and then displays aggregate results after all tests have been completed.

Writing this code in a blocking language (like Ruby for example), is extremely straightforward.

As it turns out, I'm having trouble performing this seemingly simple task in node, while also truly taking advantage of asynchronous, event-based IO.

My first design, was to perform each step serially.

  1. Load up all of the files, creating a collection of files to process
  2. Process each file in the collection
  3. Report the results when all files have been processed

This approach does work, but doesn't seem quite right to me since it causes the more computationally expensive portion of my program to wait for all of the file IO to complete. Isn't this the kind of waiting that Node was designed to avoid?

My second design, was to process each file as it was asynchronously found on disk. For the sake of argument, let's imagine a method that looks something like:

eachFileMatching(path, expression, callback) {
  // recursively, asynchronously traverse the file system,
  // calling callback every time a file name matches expression.
}

And a consumer of this method that looks something like this:

eachFileMatching('test/', /_test.js/, function(err, testFile) {
  // read and process the content of testFile
});

While this design feels like a very 'node' way of working with IO, it suffers from 2 major problems (at least in my presumably erroneous implementation):

  1. I have no idea when all of the files have been processed, so I don't know when to assemble and publish results.
  2. Because the file reads are nonblocking, and recursive, I'm struggling with how to know if no files were found.

I'm hoping that I'm simply doing something wrong, and that there is some reasonably simple strategy that other folks use to make the second approach work.

Even though this example uses a test framework, I have a variety of other projects that bump up against this exact same problem, and I imagine anyone writing a reasonably sophisticated application that accesses the file system in node would too.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

清秋悲枫 2024-10-26 11:35:58

“读取并处理 testFile 的内容”是什么意思?

我不明白为什么您不知道所有文件何时处理完毕。你不使用Streams吗?一个流有多个事件,而不仅仅是数据。如果您处理 end 事件,那么您将知道每个文件何时完成。

例如,您可能有一个文件名列表,为每个文件设置处理,然后当您收到结束事件时,从列表中删除文件名。当列表为空时,您就完成了。或者创建一个包含名称和完成状态的 FileName 对象。当您收到 end 事件时,更改状态并减少文件名计数器。当计数器归零时,您就完成了,或者如果您不确定,您可以扫描所有 FileName 对象以确保它们的状态已完成。

您还可能有一个计时器来定期检查计数器,如果它在一段时间内没有变化,则报告处理可能卡在状态未完成的 FileName 对象上。

...我刚刚在另一个问题中遇到了这种情况,接受的答案(加上 github 链接)很好地解释了它。查看for循环事件驱动代码?

What do you mean by "read and process the content of testFile"?

I don't understand why you have no idea when all of the files are processed. Are you not using Streams? A stream has several events, not just data. If you handle the end events then you will know when each file has finished.

For instance you might have a list of filenames, set up the processing for each file, and then when you get an end event, delete the filename from the list. When the list is empty you are done. Or create a FileName object that contains the name and a completion status. When you get an end event, change the status and decrement a filename counter as well. When the counter gets to zero you are done, or if you are not confident you could scan all the FileName object to make sure that their status is completed.

You might also have a timer that checks the counter periodically, and if it doesn't change for some period of time, report that the processing might be stuck on the FileName objects whose status is not completed.

... I just came across this scenario in another question and the accepted answer (plus the github link) explains it well. Check out for loop over event driven code?

无风消散 2024-10-26 11:35:58

事实证明,我能够构建的最小工作解决方案比我希望的要复杂得多。

以下是适合我的代码。它可能可以被清理或变得稍微更具可读性,我对这样的反馈不感兴趣。

如果有一种明显不同的方法来解决这个问题,那就是更简单和/或更有效,我很有兴趣听到它。确实令我惊讶的是,解决这个看似简单的需求竟然需要如此大量的代码,但这也许就是有人发明阻塞io的原因?

复杂性实际上在于满足以下所有要求:

  • 在找到文件时对其进行处理
  • 了解搜索何时完成
  • 了解是否未找到文件

以下是代码:

/**
 * Call fileHandler with the file name and file Stat for each file found inside
 * of the provided directory.
 *
 * Call the optionally provided completeHandler with an array of files (mingled
 * with directories) and an array of Stat objects (one for each of the found
 * files.
 *
 * Following is an example of a simple usage:
 *
 *   eachFileOrDirectory('test/', function(err, file, stat) {
 *     if (err) throw err;
 *     if (!stat.isDirectory()) {
 *       console.log(">> Found file: " + file);
 *     }
 *   });
 *
 * Following is an example that waits for all files and directories to be 
 * scanned and then uses the entire result to do something:
 *
 *   eachFileOrDirectory('test/', null, function(files, stats) {
 *     if (err) throw err;
 *     var len = files.length;
 *     for (var i = 0; i < len; i++) {
 *       if (!stats[i].isDirectory()) {
 *         console.log(">> Found file: " + files[i]);
 *       }
 *     }
 *   });
 */
var eachFileOrDirectory = function(directory, fileHandler, completeHandler) {
  var filesToCheck = 0;
  var checkedFiles = [];
  var checkedStats = [];

  directory = (directory) ? directory : './';

  var fullFilePath = function(dir, file) {
    return dir.replace(/\/$/, '') + '/' + file;
  };

  var checkComplete = function() {
    if (filesToCheck == 0 && completeHandler) {
      completeHandler(null, checkedFiles, checkedStats);
    }
  };

  var onFileOrDirectory = function(fileOrDirectory) {
    filesToCheck++;
    fs.stat(fileOrDirectory, function(err, stat) {
      filesToCheck--;
      if (err) return fileHandler(err);
      checkedFiles.push(fileOrDirectory);
      checkedStats.push(stat);
      fileHandler(null, fileOrDirectory, stat);
      if (stat.isDirectory()) {
        onDirectory(fileOrDirectory);
      }
      checkComplete();
    });
  };

  var onDirectory = function(dir) {
    filesToCheck++;
    fs.readdir(dir, function(err, files) {
      filesToCheck--;
      if (err) return fileHandler(err);
      files.forEach(function(file, index) {
        file = fullFilePath(dir, file);
        onFileOrDirectory(file);
      });
      checkComplete();
    });
  }

  onFileOrDirectory(directory);
};

As it turns out, the smallest working solution that I've been able to build is much more complicated than I hoped.

Following is code that works for me. It can probably be cleaned up or made slightly more readable here and there, and I'm not interested in feedback like that.

If there is a significantly different way to solve this problem, that is simpler and/or more efficient, I'm very interested in hearing it. It really surprises me that the solution to this seemingly simple requirement would require such a large amount of code, but perhaps that's why someone invented blocking io?

The complexity is really in the desire to meet all of the following requirements:

  • Handle files as they are found
  • Know when the search is complete
  • Know if no files are found

Here's the code:

/**
 * Call fileHandler with the file name and file Stat for each file found inside
 * of the provided directory.
 *
 * Call the optionally provided completeHandler with an array of files (mingled
 * with directories) and an array of Stat objects (one for each of the found
 * files.
 *
 * Following is an example of a simple usage:
 *
 *   eachFileOrDirectory('test/', function(err, file, stat) {
 *     if (err) throw err;
 *     if (!stat.isDirectory()) {
 *       console.log(">> Found file: " + file);
 *     }
 *   });
 *
 * Following is an example that waits for all files and directories to be 
 * scanned and then uses the entire result to do something:
 *
 *   eachFileOrDirectory('test/', null, function(files, stats) {
 *     if (err) throw err;
 *     var len = files.length;
 *     for (var i = 0; i < len; i++) {
 *       if (!stats[i].isDirectory()) {
 *         console.log(">> Found file: " + files[i]);
 *       }
 *     }
 *   });
 */
var eachFileOrDirectory = function(directory, fileHandler, completeHandler) {
  var filesToCheck = 0;
  var checkedFiles = [];
  var checkedStats = [];

  directory = (directory) ? directory : './';

  var fullFilePath = function(dir, file) {
    return dir.replace(/\/$/, '') + '/' + file;
  };

  var checkComplete = function() {
    if (filesToCheck == 0 && completeHandler) {
      completeHandler(null, checkedFiles, checkedStats);
    }
  };

  var onFileOrDirectory = function(fileOrDirectory) {
    filesToCheck++;
    fs.stat(fileOrDirectory, function(err, stat) {
      filesToCheck--;
      if (err) return fileHandler(err);
      checkedFiles.push(fileOrDirectory);
      checkedStats.push(stat);
      fileHandler(null, fileOrDirectory, stat);
      if (stat.isDirectory()) {
        onDirectory(fileOrDirectory);
      }
      checkComplete();
    });
  };

  var onDirectory = function(dir) {
    filesToCheck++;
    fs.readdir(dir, function(err, files) {
      filesToCheck--;
      if (err) return fileHandler(err);
      files.forEach(function(file, index) {
        file = fullFilePath(dir, file);
        onFileOrDirectory(file);
      });
      checkComplete();
    });
  }

  onFileOrDirectory(directory);
};
何其悲哀 2024-10-26 11:35:58

执行此操作的两种方法,第一种方法可能是串行考虑的,类似于

var files = [];
doFile(files, oncomplete);

function doFile(files, oncomplete) {
  if (files.length === 0) return oncomplete();
  var f = files.pop();
  processFile(f, function(err) {
    // Handle error if any
    doFile(files, oncomplete); // Recurse
  });
};

function processFile(file, callback) {
  // Do whatever you want to do and once 
  // done call the callback
  ...
  callback();
};

第二种方法,我们称之为并行,它是类似的,并且总结如下:

var files = [];
doFiles(files, oncomplete);

function doFiles(files, oncomplete) {
  var exp = files.length;
  var done = 0;
  for (var i = 0; i < exp; i++) {
    processFile(files[i], function(err) {
      // Handle errors (but still need to increment counter)
      if (++done === exp) return oncomplete();      
    });
  }
};

function processFile(file, callback) {
  // Do whatever you want to do and once 
  // done call the callback
  ...
  callback();
};

现在看起来很明显您应该使用第二种方法,但您会发现对于 IO 密集型操作并行化时你并没有真正获得任何性能提升。第一种方法的一个缺点是递归可能会破坏堆栈跟踪。

2 ways of doing this, first and probably considered serially would go something like

var files = [];
doFile(files, oncomplete);

function doFile(files, oncomplete) {
  if (files.length === 0) return oncomplete();
  var f = files.pop();
  processFile(f, function(err) {
    // Handle error if any
    doFile(files, oncomplete); // Recurse
  });
};

function processFile(file, callback) {
  // Do whatever you want to do and once 
  // done call the callback
  ...
  callback();
};

Second way, lets call it parallel is similar and goes summin like:

var files = [];
doFiles(files, oncomplete);

function doFiles(files, oncomplete) {
  var exp = files.length;
  var done = 0;
  for (var i = 0; i < exp; i++) {
    processFile(files[i], function(err) {
      // Handle errors (but still need to increment counter)
      if (++done === exp) return oncomplete();      
    });
  }
};

function processFile(file, callback) {
  // Do whatever you want to do and once 
  // done call the callback
  ...
  callback();
};

Now it may seem obvious you should use the second approach but you'll find that for IO intensive operations you dont really get any performance gains when parallelising. One dissadvantage of first approach is that the recursion can blow out your stack trace.

Tnx

Guido

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文