并行处理传入的 xml 文件

发布于 2025-01-03 09:46:20 字数 453 浏览 4 评论 0 原文

我需要处理传入的 xml 文件(它们将由其他应用程序直接在特定文件夹中创建)并且我需要快速完成。

每天最多可以有 200 000 个文件,我当前的假设是使用 .NET 4tpl

我当前的服务概念是:

在一个循环中,我想检查文件夹中是否有新文件,如果找到其中任何一个,我会将它们放入队列中,该队列将由另一个循环处理,该循环将从队列中获取文件并为每个文件创建他们新的任务(线程)。同时任务的数量应该是可配置的。 第一部分很简单,但创建两个主循环并在它们之间设置队列对我来说是新事物。

还有问题: 如何创建两个循环(一个用于检查文件夹和添加文件,第二个用于从队列中获取文件并并行处理它们)并添加队列以在它们之间进行通信。

对于第一部分(文件夹检查),建议的解决方案是使用 FileSystemWatcher。现在需要讨论第二部分(也许是一些任务计划程序)。

I need to process incoming xml files (they will be created by other application directly in specific folder) and I need to do it fast.

There can be up to 200 000 files per day and my current assumption is to use .NET 4 and tpl.

My current service concept is:

In a loop I want to check folder for new files, if I find any of them, I will put them to queue, which will be processed by another loop which will take files from queue and create for each of them new task(thread). Number of simultaneous tasks should be configurable.
First part is easy but creating two main loops with queue between them is something new for me.

And the question:
How to create two loops(one for checking folder and adding files and second for taking files from queue and process them parallel) and add queue to communicate between them.

For first part (folder checking) suggested solution is to use FileSystemWatcher. Now second part needs to be discussed (maybe some Task Scheduler).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

罗罗贝儿 2025-01-10 09:46:20

听起来你的拼图中缺少的一块是 BlockingCollection

FileSystemWatcher watcher;
BlockingCollection<string> bc; 
private readonly object _lock = new object();
Task[] tasks;

void PrepareWatcher()
{
    watcher = new FileSystemWatcher(@"c:");
    watcher.Created += (s,e) => 
    {
        lock(_lock) //Prevents race condition when stopping
        {
            if (!bc.IsAddingCompleted)
                bc.Add(e.FullPath);
        }
    };
}

void StartProcessing(int taskCount)
{
    tasks = new Task[taskCount];
    bc = new BlockingCollection<string>();

    for (int i = 0; i< taskCount; i++)
        tasks[i] = (Task.Factory.StartNew(() =>
        {
            foreach (var x in bc.GetConsumingEnumerable())
                ProcessXml(x);
        }, TaskCreationOptions.LongRunning)); 

    watcher.EnableRaisingEvents = true;
}

void ProcessXml(string path)
{
    //Do your processing here...
    //Note many events will be called multiple times, see:
    //http://weblogs.asp.net/ashben/archive/2003/10/14/31773.aspx
}

void StopProcessing()
{
    watcher.EnableRaisingEvents = false;

    lock (_lock) //The above line doesn't guarantee no more events will be called,
                 //And Add() and CompleteAdding() can't be called concurrently
        bc.CompleteAdding(); 

    Task.WaitAll(tasks);
    foreach (var task in tasks)
        task.Dispose();
    bc.Dispose();
    tasks = null;
}

Sounds like the missing piece in your puzzle is a BlockingCollection:

FileSystemWatcher watcher;
BlockingCollection<string> bc; 
private readonly object _lock = new object();
Task[] tasks;

void PrepareWatcher()
{
    watcher = new FileSystemWatcher(@"c:");
    watcher.Created += (s,e) => 
    {
        lock(_lock) //Prevents race condition when stopping
        {
            if (!bc.IsAddingCompleted)
                bc.Add(e.FullPath);
        }
    };
}

void StartProcessing(int taskCount)
{
    tasks = new Task[taskCount];
    bc = new BlockingCollection<string>();

    for (int i = 0; i< taskCount; i++)
        tasks[i] = (Task.Factory.StartNew(() =>
        {
            foreach (var x in bc.GetConsumingEnumerable())
                ProcessXml(x);
        }, TaskCreationOptions.LongRunning)); 

    watcher.EnableRaisingEvents = true;
}

void ProcessXml(string path)
{
    //Do your processing here...
    //Note many events will be called multiple times, see:
    //http://weblogs.asp.net/ashben/archive/2003/10/14/31773.aspx
}

void StopProcessing()
{
    watcher.EnableRaisingEvents = false;

    lock (_lock) //The above line doesn't guarantee no more events will be called,
                 //And Add() and CompleteAdding() can't be called concurrently
        bc.CompleteAdding(); 

    Task.WaitAll(tasks);
    foreach (var task in tasks)
        task.Dispose();
    bc.Dispose();
    tasks = null;
}
情仇皆在手 2025-01-10 09:46:20

我很惊讶没有人问过,但考虑到您想要实现的是两个应用程序之间的某种消息传递,您是否考虑过使用 WCF?

I am quite surprised that no one has yet asked, but considering what you're trying to achieve is some kind of messaging between two applications, have you considered using WCF?

诺曦 2025-01-10 09:46:20

可能不需要循环,也不确定并行是否必要。如果您想处理一批新文件,这将很有用。
将出现新文件的文件夹上的 FileSystemWatcher 将为您提供一个将文件添加到队列的事件。

为添加到队列的项目添加一个事件,以触发线程来处理单个文件。

如果您敲出一个简单的类、文件、状态、检测到的时间等。

您将有一个检测线程添加到队列中,一个线程池来处理它们,并在成功时将它们从队列中删除。

您可能会发现上一个问题在 .net 4

线程安全列表 中很有用。属性

特别是如果您想处理自 X 以来的所有新文件。

请注意,如果您不打算使用文件系统观察程序而只是从文件夹中获取文件,则将其移至“已处理”文件夹,也可能是“失败文件夹” ,将是一个好主意。读取 200,00 个文件名来检查是否已处理它们会消除并行处理它们的任何好处。

即使你这样做,我也会推荐它。只需将其移回待处理(或在编辑失败后)将触发它被重新处理。另一个优点是,如果您正在处理数据库,并且所有数据都会上升,并且您的上次备份位于 X。您可以恢复,然后只需将处理过的所有文件移回“toprocess”文件夹即可。

您还可以使用已知输入进行测试运行,并检查前后数据库的状态。

进一步评论。

任务使用的线程池有一个线程池限制,适用于应用程序中的所有任务或后台任务。

评论后。

如果您想限制并发任务的数量...

Starter for 10 您可以轻松改进,以进行调整和增强。

在您管理从文件队列中启动任务的类中,类似

private object _canRunLock;
private int _maxTasks;
private int _activeTasks;

public MyTaskManager(int argMaxTasks)
{
  _maxTasks = argMaxTasks;
  _canRunLock = new object();
  _activeTasks = 0;
}


public bool CanRunTask(MyTask argTask)
{
  get
  {
    lock(_canRunLock)
    {
      if (_activeTasks < _maxTasks)
      {
        ExecuteTask(argTask);
        _activeTasks++;
        return true;
      }
    }
    return false;
  }
}

public void TaskCompleted()
{
  lock(_canRunLock)
  {
    if (_activeTasks > 0)
    {
      _activeTasks--;
    }
    else
    {
      throw new WTFException("Okay how did this happen?");
    }
  }
}

简单且安全(我认为)。您也可以让另一个属性暂停或禁用来检查。可能想让上面的内容成为一个单例(:(),或者至少记住,如果你运行多个......

我能给出的最好建议是从简单开始,开放和解耦,然后根据需要复杂化,在这里很容易开始优化,最好不要让所有线程都在等待文件系统或后端,但我怀疑处理器数量是否会成为瓶颈,因此您的 maxTasks 有点大。拇指在空中。
与固定数字相比,在下限和上限之间进行某种自调整可能是一件好事。

May not need loops, not sure parallel is necessary either. That would be useful if you want to process a batch of new files.
FileSystemWatcher on the folder where new files will appear, will give you an event to add a file to the queue.

Add an event for item added to queue, to trigger a thread to process an individual file.

If you knock up a simple class, File, state, detected time etc.

You'd have a detection thread adding to the queue, a threadpool to process them and on success remove them from the queue.

You might find this previous question useful threasafe "lists" in .net 4

Thread-safe List<T> property

Particularly if you want to process all new files since X.

Note if you aren't going to use FileSystem watcher and just get files from the folder, a Processed folder to move them to and maybe a Failed Folder as well, would be a good idea. Reading 200,00 filenames in to check to see if you've processed them would sort of remove any benefit from parallel processing them.

Even if you do, I'd recomend it. Just moving it back in to To Process (or after an edit in case of failures) will trigger it to be reprocessed. Another advantage is say if you are processing into a database and it all goes nipples up and your last back up was at X. You restore and then simply move all the files you did process back into the "toprocess" folder.

You can also do test runs with known input and check the db's state before and after.

Further to comment.

ThreadPool which is used by Task has a ThreadPool limit put that's for all for or background tasks in yor app.

After comment.

If you want to limit the number of concurrent tasks...

Starter for ten you can easily improve upon, for tuning and boosting.

In your class that manages kicking off tasks from the file queue, something like

private object _canRunLock;
private int _maxTasks;
private int _activeTasks;

public MyTaskManager(int argMaxTasks)
{
  _maxTasks = argMaxTasks;
  _canRunLock = new object();
  _activeTasks = 0;
}


public bool CanRunTask(MyTask argTask)
{
  get
  {
    lock(_canRunLock)
    {
      if (_activeTasks < _maxTasks)
      {
        ExecuteTask(argTask);
        _activeTasks++;
        return true;
      }
    }
    return false;
  }
}

public void TaskCompleted()
{
  lock(_canRunLock)
  {
    if (_activeTasks > 0)
    {
      _activeTasks--;
    }
    else
    {
      throw new WTFException("Okay how did this happen?");
    }
  }
}

Simple and safe (I think). You could have another property pause or disable to check as well. Might want to make the above a singleton ( :( ), or at least bear in mind that what if you run more than one....

Best advice I can give is start simple, open and decoupled, and then complicate as necessary, be easy to start optimising prematurely here. A good idea not to have a load a of threads all waiting on say the FileSystem, or a backend, but I doubt number of processors is ever going to be a bottleneck, so your maxTasks is a bit thumb in the air.
Some sort of self tune between a lower and upper limit might be a good thing as opposed to one fixed number.

指尖上得阳光 2025-01-10 09:46:20

我想你可以通过 FileSystemWatcher 检查新文件是否到来。 http://www.codeproject 上有一篇文章。 com/Articles/25443/Watching-Folder-Activity-in-C-NET

FileSystemWatcher 可以帮助您不在特定文件夹中循环。

希望这有帮助。

I think you can check new file is coming by FileSystemWatcher. There is an article at http://www.codeproject.com/Articles/25443/Watching-Folder-Activity-in-C-NET.

FileSystemWatcher is help you not loop in specific folder.

Hope this help.

失而复得 2025-01-10 09:46:20

IMO 你想要的是类似 cron 的工作。该算法的一个版本可以是:

for every job (called periodically via cron/scheduler) run

   //
   // your program
   //
   if job_is_running {
      // Still busy...
      // don't process anything and just return back
      return
   }

   // Create your array
   //
   Array a = new Array()
   for each file in folder {
      a.append(file)
   }

   // Process each file
   //
   for each item in a {
     process_item(item);

     // Move it (or delete)
     //
     remove_from_input_folder(item)
   }

现在,您可以在处理之前调用remove_from input(),以避免系统崩溃时的双重处理。

不久前我不得不为一家电话公司做类似的事情,这是我们得到的最舒适的解决方案:)

更新:并行位循环

通过文件来构建数组在理论上与实际相比可以忽略不计加工。因此,您可以轻松地将第二个循环转换为基于工作线程的并行变体。

华泰

IMO what you want is something like a cron job. A version of the algorithm can be:

for every job (called periodically via cron/scheduler) run

   //
   // your program
   //
   if job_is_running {
      // Still busy...
      // don't process anything and just return back
      return
   }

   // Create your array
   //
   Array a = new Array()
   for each file in folder {
      a.append(file)
   }

   // Process each file
   //
   for each item in a {
     process_item(item);

     // Move it (or delete)
     //
     remove_from_input_folder(item)
   }

Now, you can call remove_from input() before processing, to avoid double processing if the system crashes.

I had to do something like that for a phone company a while ago and this was the most comfortable solution we got :)

Update: The parallel bit

Looping through files to build the array is theoretically negligible compared to actual processing. Therefore, you can convert the second loop into a worker-based parallel variant easily enough.

HTH

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文