线程 I/O 重新排序缓冲区的标准术语?

发布于 2024-09-03 12:52:02 字数 661 浏览 5 评论 0 原文

我有一种情况,许多线程同时生成数据,这些数据最终写入一个长的串行文件流。我需要以某种方式序列化这些写入,以便以正确的顺序写入流。

,我有一个包含 2048 个作业 j0..jn 的输入队列,每个作业都会生成一块数据 o我。这些作业在八个线程上并行运行,但输出块必须以与相应输入块相同的顺序出现在流中 - 输出文件的顺序必须为 o0 o1o2...

这个问题的解决方案非常不言而喻:我需要某种缓冲区来以正确的顺序累积和写入输出块,类似于Tomasulo 算法中的 CPU 重新排序缓冲区,或者 TCP 重组的方式- 在将数据包传递到应用层之前对数据包进行排序。

在编写代码之前,我想快速进行文献搜索,看看是否有任何论文以特别聪明或有效的方式解决了这个问题,因为我有严格的实时和内存限制。我似乎找不到任何描述这一点的论文;对[线程、并发、重新排序缓冲区、重组、io、序列化]的每个排列的学术搜索没有产生任何有用的结果。我觉得我一定是没有搜索到正确的术语。

我可以搜索这种模式的通用学术名称或关键字吗?

I have a case where many threads all concurrently generate data that is ultimately written to one long, serial file stream. I need to somehow serialize these writes so that the stream gets written in the right order.

ie, I have an input queue of 2048 jobs j0..jn, each of which produces a chunk of data oi. The jobs run in parallel on, say, eight threads, but the output blocks have to appear in the stream in the same order as the corresponding input blocks — the output file has to be in the order o0o1o2...

The solution to this is pretty self evident: I need some kind of buffer that accumulates and writes the output blocks in the correct order, similar to a CPU reorder buffer in Tomasulo's algorithm, or to the way that TCP reassembles out-of-order packets before passing them to the application layer.

Before I go code it, I'd like to do a quick literature search to see if there are any papers that have solved this problem in a particularly clever or efficient way, since I have severe realtime and memory constraints. I can't seem to find any papers describing this though; a Scholar search on every permutation of [threads, concurrent, reorder buffer, reassembly, io, serialize] hasn't yielded anything useful. I feel like I must just not be searching the right terms.

Is there a common academic name or keyword for this kind of pattern that I can search on?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

牵你的手,一向走下去 2024-09-10 12:52:03

企业集成模式一书将其称为Resequencer (p282/网络)。

The Enterprise Integration Patterns book calls this a Resequencer (p282/web).

雨夜星沙 2024-09-10 12:52:03

实际上,您不需要积累这些块。大多数操作系统和语言都提供随机访问文件抽象,允许每个线程独立地将其输出数据写入文件中的正确位置,而不影响任何其他线程的输出数据。

或者您是否正在像套接字一样写入真正的串行输出文件?

Actually, you shouldn't need to accumulate the chunks. Most operating system and languages provide a random-access file abstraction that would allow each thread to independently write its output data to the correct position in the file without affecting the output data from any of the other threads.

Or are you writing to truly serial output file like a socket?

雪花飘飘的天空 2024-09-10 12:52:03

就我个人而言,我根本不会使用可重新排序的缓冲区。我将为每个作业创建一个“作业”对象,并且根据您的环境,使用消息传递或互斥体按顺序接收每个作业的完整数据。如果下一个工作没有完成,你的“编写器”进程就会等待,直到它完成。

I wouldn't use a reorderable buffer at all, personally. I'd create one 'job' object per job, and, depending on your environment, either use message passing or mutexes to receive completed data from each job in order. If the next job isn't done, your 'writer' process waits until it is.

岁吢 2024-09-10 12:52:03

我会使用与您正在使用的线程数具有相同长度的环形缓冲区。环形缓冲区也将具有相同数量的互斥体。

rinbuffer 还必须知道它写入文件的最后一个块的id。它相当于环形缓冲区的 0 索引。

在添加到环形缓冲区时,您检查是否可以写入,即设置了索引 0,然后您可以一次向文件写入多个块。

如果没有设置索引0,则简单地锁定当前线程等待。 -- 您还可以拥有一个比线程数长 2-3 倍的环形缓冲区,并且仅在适当的时候锁定,即:当启动了足够多的作业来填满缓冲区时。

不要忘记更新最后写入的块;)

写入文件时您还可以使用双缓冲。

I would use a ringbuffer that has the same lenght as the number of threads you are using. The ringbuffer would also have the same number of mutexes.

The rinbuffer must also know the id of the last chunk it has written to the file. It is equivalent to the 0 index of your ringbuffer.

On add to the ringbuffer, you check if you can write, ie index 0 is set, you can then write more than one chunk at a time to the file.

If index 0 is not set, simply lock the current thread to wait. -- You could also have a ringbuffer 2-3 times in lenght than your number of threads and lock only when appropriate, ie : when enough jobs to full the buffer have been launched.

Don't forget to update the last chunk written tough ;)

You could also use double buffering when writting to the file.

情域 2024-09-10 12:52:03

让输出队列包含期货而不是实际数据。当您从输入队列中检索一个项目时,立即将相应的 future 发布到输出队列上(注意确保这保留了顺序 --- 见下文)。当工作线程处理完该项目后,它可以设置未来的值。输出线程可以从队列中读取每个 future,并阻塞直到该 future 准备就绪。如果后来的线程提前准备好,那么只要 future 是有序的,这根本不会影响输出线程。

有两种方法可以确保输出队列上的 future 顺序正确。第一种是使用单个互斥体从输入队列读取数据并写入输出队列。每个线程锁定互斥体,从输入队列中获取一个项目,将未来发布到输出队列并释放互斥体。

第二种是有一个主线程从输入队列中读取数据,将 future 发布到输出队列上,然后将项目交给工作线程来执行。

在使用单个互斥体保护队列的 C++ 中,这看起来像:

#include <thread>
#include <mutex>
#include <future>

struct work_data{};
struct result_data{};

std::mutex queue_mutex;
std::queue<work_data> input_queue;
std::queue<std::future<result_data> > output_queue;

result_data process(work_data const&); // do the actual work

void worker_thread()
{
    for(;;) // substitute an appropriate termination condition
    {
        std::promise<result_data> p;
        work_data data;
        {
            std::lock_guard<std::mutex> lk(queue_mutex);
            if(input_queue.empty())
            {
                continue;
            }
            data=input_queue.front();
            input_queue.pop();
            std::promise<result_data> item_promise;
            output_queue.push(item_promise.get_future());
            p=std::move(item_promise);
        }
        p.set_value(process(data));
    }
}

void write(result_data const&); // write the result to the output stream

void output_thread()
{
    for(;;) // or whatever termination condition
    {
        std::future<result_data> f;
        {
            std::lock_guard<std::mutex> lk(queue_mutex);
            if(output_queue.empty())
            {
                continue;
            }
            f=std::move(output_queue.front());
            output_queue.pop();
        }
        write(f.get());
    }
}

Have the output queue contain futures rather than the actual data. When you retrieve an item from the input queue, immediately post the corresponding future onto the output queue (taking care to ensure that this preserves the order --- see below). When the worker thread has processed the item it can then set the value on the future. The output thread can read each future from the queue, and block until that future is ready. If later ones become ready early this doesn't affect the output thread at all, provided the futures are in order.

There are two ways to ensure that the futures on the output queue are in the correct order. The first is to use a single mutex for reading from the input queue and writing to the output queue. Each thread locks the mutex, takes an item from the input queue, posts the future to the output queue and releases the mutex.

The second is to have a single master thread that reads from the input queue, posts the future on the output queue and then hand the item off to a worker thread to execute.

In C++ with a single mutex protecting the queues this would look like:

#include <thread>
#include <mutex>
#include <future>

struct work_data{};
struct result_data{};

std::mutex queue_mutex;
std::queue<work_data> input_queue;
std::queue<std::future<result_data> > output_queue;

result_data process(work_data const&); // do the actual work

void worker_thread()
{
    for(;;) // substitute an appropriate termination condition
    {
        std::promise<result_data> p;
        work_data data;
        {
            std::lock_guard<std::mutex> lk(queue_mutex);
            if(input_queue.empty())
            {
                continue;
            }
            data=input_queue.front();
            input_queue.pop();
            std::promise<result_data> item_promise;
            output_queue.push(item_promise.get_future());
            p=std::move(item_promise);
        }
        p.set_value(process(data));
    }
}

void write(result_data const&); // write the result to the output stream

void output_thread()
{
    for(;;) // or whatever termination condition
    {
        std::future<result_data> f;
        {
            std::lock_guard<std::mutex> lk(queue_mutex);
            if(output_queue.empty())
            {
                continue;
            }
            f=std::move(output_queue.front());
            output_queue.pop();
        }
        write(f.get());
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文