使用 OpenMP 并行化 while 循环

发布于 2024-12-06 08:02:29 字数 627 浏览 0 评论 0原文

我有一个非常大的数据文件,这个数据文件中的每条记录有4行。我编写了一个非常简单的 C 程序来分析这种类型的文件并打印出一些有用的信息。该程序的基本思想是这样的。

int main()
{
  char buffer[BUFFER_SIZE];
  while(fgets(buffer, BUFFER_SIZE, stdin))
  {
    fgets(buffer, BUFFER_SIZE, stdin);
    do_some_simple_processing_on_the_second_line_of_the_record(buffer);
    fgets(buffer, BUFFER_SIZE, stdin);
    fgets(buffer, BUFFER_SIZE, stdin);
  }
  print_out_result();
}

这当然遗漏了一些细节(健全性/错误检查等),但这与问题无关。

该程序运行良好,但我正在使用的数据文件很大。我想我会尝试通过使用 OpenMP 并行化循环来加速程序。不过,经过一番搜索后,OpenMP 似乎只能处理预先知道迭代次数的 for 循环。由于我事先不知道文件的大小,甚至像 wc -l 这样的简单命令也需要很长时间才能运行,我该如何并行化这个程序呢?

I have a very large data file, and each record in this data file has 4 lines. I have written a very simple C program to analyze files of this type and print out some useful information. The basic idea of the program is this.

int main()
{
  char buffer[BUFFER_SIZE];
  while(fgets(buffer, BUFFER_SIZE, stdin))
  {
    fgets(buffer, BUFFER_SIZE, stdin);
    do_some_simple_processing_on_the_second_line_of_the_record(buffer);
    fgets(buffer, BUFFER_SIZE, stdin);
    fgets(buffer, BUFFER_SIZE, stdin);
  }
  print_out_result();
}

This of course leaves out some details (sanity/error checking, etc), but that is not relevant to the question.

The program works fine, but the data files I'm working with are huge. I figured I would try to speed up the program by parallelizing the loop with OpenMP. After a bit of searching, though, it appears that OpenMP can only handle for loops where the number of iterations is know beforehand. Since I don't know the size of the files beforehand, and even simple commands like wc -l take a long time to run, how can I parallelize this program?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

痴骨ら 2024-12-13 08:02:29

正如 thiton 提到的,这段代码可能是 I/O 限制的。然而,如今许多计算机可能都配备了 SSD 和高吞吐量 RAID 磁盘。在这种情况下,您可以通过并行化来提高速度。此外,如果计算量很大,那么并行化会获胜。即使 I/O 由于带宽饱和而被有效串行化,您仍然可以通过将计算分配给多核来获得加速。


回到问题本身,您可以通过 OpenMP 并行化此循环。对于 stdin,我不知道并行化,因为它需要顺序读取并且没有结束的先验信息。但是,如果您正在处理典型文件,则可以做到。

这是我使用 omp parallel 的代码。我使用了一些 Win32 API 和 MSVC CRT:

void test_io2()
{
  const static int BUFFER_SIZE = 1024;
  const static int CONCURRENCY = 4;

  uint64_t local_checksums[CONCURRENCY];
  uint64_t local_reads[CONCURRENCY];

  DWORD start = GetTickCount();

  omp_set_num_threads(CONCURRENCY);

  #pragma omp parallel
  {
    int tid = omp_get_thread_num();

    FILE* file = fopen("huge_file.dat", "rb");
    _fseeki64(file, 0, SEEK_END);
    uint64_t total_size = _ftelli64(file);

    uint64_t my_start_pos = total_size/CONCURRENCY * tid;
    uint64_t my_end_pos   = min((total_size/CONCURRENCY * (tid + 1)), total_size);
    uint64_t my_read_size = my_end_pos - my_start_pos;
    _fseeki64(file, my_start_pos, SEEK_SET);

    char* buffer = new char[BUFFER_SIZE];

    uint64_t local_checksum = 0;
    uint64_t local_read = 0;
    size_t read_bytes;
    while ((read_bytes = fread(buffer, 1, min(my_read_size, BUFFER_SIZE), file)) != 0 &&
      my_read_size != 0)
    {
      local_read += read_bytes;
      my_read_size -= read_bytes;
      for (int i = 0; i < read_bytes; ++i)
        local_checksum += (buffer[i]);
    }

    local_checksums[tid] = local_checksum;
    local_reads[tid]     = local_read;

    fclose(file);
  }

  uint64_t checksum = 0;
  uint64_t total_read = 0;
  for (int i = 0; i < CONCURRENCY; ++i)
    checksum += local_checksums[i], total_read += local_reads[i];

  std::cout << checksum << std::endl
    << total_read << std::endl
    << double(GetTickCount() - start)/1000. << std::endl;
}

这段代码看起来有点脏,因为我需要精确分配要读取的文件量。然而,代码相当简单。请记住的一件事是您需要有一个每线程文件指针。您不能简单地共享文件指针,因为内部数据结构可能不是线程安全的。此外,该代码可以通过parallel for 进行并行化。但是,我认为这种方法更自然。


简单的实验结果

我已经测试了此代码以读取 HDD (WD Green 2TB) 和 SSD (Intel 120GB) 上的 10GB 文件。

是的,使用 HDD 并没有获得任何加速。甚至观察到放缓。这清楚地表明该代码是 I/O 受限的。这段代码实际上没有任何计算。只是输入/输出。

然而,使用 SSD,我在 4 核的情况下加速达到 1.2。是的,加速比很小。但是,您仍然可以通过 SSD 获得它。而且,如果计算量增加一点(我只是放置一个非常短的忙等待循环),加速将是显着的。我能够获得 2.5 的加速。


总之,我建议您尝试并行化此代码。

另外,如果计算并不简单,我会推荐管道化。上面的代码只是简单的分成了几个大块,导致缓存效率很差。然而,管道并行化可能会产生更好的缓存利用率。尝试使用TBB进行管道并行化。它们提供了一个简单的管道构造。

As thiton mentioned, this code could be I/O bounded. However, these days many computers may have SSDs and high-throughput RAID disks. In such case, you can get speedup from parallelization. Moreover, if the computation is not trivial, then parallelize wins. Even if the I/O is effectively serialized due to saturated bandwidth, you can still get speedup by distributing the computation to multicore.


Back to the question itself, you can parallelize this loop by OpenMP. With stdin, I have no idea to parallelize because it needs to read sequentially and no prior information of the end. However, if you're working a typical file, you can do it.

Here is my code with omp parallel. I used some Win32 API and MSVC CRT:

void test_io2()
{
  const static int BUFFER_SIZE = 1024;
  const static int CONCURRENCY = 4;

  uint64_t local_checksums[CONCURRENCY];
  uint64_t local_reads[CONCURRENCY];

  DWORD start = GetTickCount();

  omp_set_num_threads(CONCURRENCY);

  #pragma omp parallel
  {
    int tid = omp_get_thread_num();

    FILE* file = fopen("huge_file.dat", "rb");
    _fseeki64(file, 0, SEEK_END);
    uint64_t total_size = _ftelli64(file);

    uint64_t my_start_pos = total_size/CONCURRENCY * tid;
    uint64_t my_end_pos   = min((total_size/CONCURRENCY * (tid + 1)), total_size);
    uint64_t my_read_size = my_end_pos - my_start_pos;
    _fseeki64(file, my_start_pos, SEEK_SET);

    char* buffer = new char[BUFFER_SIZE];

    uint64_t local_checksum = 0;
    uint64_t local_read = 0;
    size_t read_bytes;
    while ((read_bytes = fread(buffer, 1, min(my_read_size, BUFFER_SIZE), file)) != 0 &&
      my_read_size != 0)
    {
      local_read += read_bytes;
      my_read_size -= read_bytes;
      for (int i = 0; i < read_bytes; ++i)
        local_checksum += (buffer[i]);
    }

    local_checksums[tid] = local_checksum;
    local_reads[tid]     = local_read;

    fclose(file);
  }

  uint64_t checksum = 0;
  uint64_t total_read = 0;
  for (int i = 0; i < CONCURRENCY; ++i)
    checksum += local_checksums[i], total_read += local_reads[i];

  std::cout << checksum << std::endl
    << total_read << std::endl
    << double(GetTickCount() - start)/1000. << std::endl;
}

This code looks a bit dirty because I needed to precisely distribute the amount of the file to be read. However, the code is fairly straightforward. One thing keep in mind is that you need to have a per-thread file pointer. You can't simply share a file pointer because the internal data structure may not be thread-safe. Also, this code can be parallelized by parallel for. But, I think this approach is more natural.


Simple experimental results

I have tested this code to read a 10GB file on either a HDD (WD Green 2TB) and a SSD (Intel 120GB).

With a HDD, yes, no speedups were obtained. Even slowdown was observed. This clearly shows that this code is I/O bounded. This code virtually has no computation. Just I/O.

However, with a SSD, I had a speedup of 1.2 with 4 cores. Yes, the speedup is small. But, you still can get it with SSD. And, if the computation becomes a bit more (I just put a very short busy-waiting loop), speedups would be significant. I was able to get speedup of 2.5.


In sum, I'd like to recommend that you try to parallelize this code.

Also, if the computation is not trivial, I would recommend pipelining. The above code simply divides into several big chunks, resulting in poor cache efficiency. However, pipeline parallelization may yield better cache utilization. Try to use TBB for pipeline parallelization. They provide a simple pipeline construct.

丶视觉 2024-12-13 08:02:29

您是否检查过您的进程实际上是 CPU 密集型而不是 I/O 密集型?您的代码看起来非常像 I/O 密集型代码,它不会从并行化中获得任何好处。

Have you checked that your process is actually CPU-bound and not I/O-bound? Your code looks very much like I/O-bound code, which would gain nothing from parallelization.

像你 2024-12-13 08:02:29

为了回应“介意”,我认为你的代码实际上并没有优化这里的任何内容。关于“#pragma omp parallel”这个语句有很多常见的误解,这个语句实际上只会产生线程,如果没有“for”关键字,所有线程只会执行后面的任何代码。因此,您的代码实际上会在每个线程上重复计算。回应 Daniel,你是对的,OpenMP 无法优化 while 循环,优化它的唯一方法是重构代码,以便提前知道迭代(例如使用计数器 while 循环一次)。很抱歉发布另一个答案,因为我还无法发表评论,但希望这可以消除常见的误解。

In response to "minding", I don't think your code actually optimize anything here. There are a lot of common misunderstanding about this statement "#pragma omp parallel", this one would actually just spawn the threads, without the "for" key word, all the threads will just execute whatever codes that are following. So your code would actually be duplicating the computation on each thread. In response to Daniel, you were right, OpenMP can't optimize while loop, the only way to optimize it is by restructuring the code so that iteration is known in advance (such as while loop it once with a counter). Sorry about posting another answer, as I can't comment yet, but hopefully, this clears out the common misunderstandings.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文