块设备驱动程序中的生产者消费者实现?

发布于 2024-11-24 17:41:44 字数 2883 浏览 7 评论 0原文

我正在尝试在我的块级驱动程序中实现类似于生产者-消费者的情况(在 Linux 内核版本 2.6.39.1 上)。我的块驱动程序的 make_request_fn 从用户级应用程序接收 struct bio 流。收到这些 BIO 后,它们就会排队。接下来,我创建一个新的struct bio,它将保存排队的 BIO 中存在的所有信息。仅当较低级别驱动程序的 request_queue 中有可用的 struct request 槽时,才会提交这个新的“merged_bio”。与此同时,我的块驱动程序将继续从用户级应用程序接收 BIO 并将它们排队(密集负载情况)。现在,为了减少延迟,我想确保当 BIO 排队时,由最旧的 BIO 组成的批次会出队,从而最大限度地减少队列中的空闲时间。我很困惑如何实现这种流水线情况(每个请求延迟较低),其中我的驱动程序的 make_request_fn 是 BIO 的生产者和调用submit_bio() 的函数是这些 BIO 的消费者。可能的方法包括:

  1. Tasklet——一旦 request_queue 中的 struct request 槽变为空闲,让 tasklet 持续消耗队列中的 BIO。 。这种方法不起作用,因为微线程处理函数不是原子的,因为它调用 submit_bio(),而后者又调用 schedule()

  2. 工作队列——它们的延迟可能比微线程更高,因为工作队列的处理函数可以休眠。这可能会影响我的驱动程序的性能,因为 BIO 提交给较低级别​​驱动程序的时间可能比 make_request_fn 实际排队的时间晚得多。我不确定这对性能有多大影响(我的驱动程序实现了快速日志记录设备。) 工作队列的另一个问题是如何以及何时安排工作队列?一旦请求槽可用,就必须提交merged_bio。因此,必须有某种“信号”机制,一旦struct request可用,它就会调度工作队列。我不明白如何在没有信令或 poll() 机制的情况下连续监视 request_queue 以获得空闲槽,然后显式调度工作队列。是的,我们不能从先前完成的 BIO 的回调函数中调用 submit_bio()

  3. 内核线程——我正在考虑将其作为第三个选项。我对内核线程不太了解,但我计划这样做:我的驱动程序的 make_request_fn 将继续将 BIO 排队。将创建一个新的内核线程,struct request槽可用时(否则不会)持续消耗队列中的BIO。因此,每次调度该内核线程时,它都会检查是否有空的请求槽,然后从队列中消耗一批 BIO 并调用 submit_bio()

  4. 更智能的东西??

stackoverflow 的成员可以帮助我选择一种智能且高效的方法来实现此场景吗?谢谢你!

更新:我尝试了工作队列方法,但它只是导致我的内核崩溃(/var/log/messages 包含乱码文本,所以我没有任何日志可以共享)。以下是我如何实现工作队列:

工作队列将使用的数据结构:

struct my_work_struct {
   struct work_struct wk;
   pitdev_t *pd;    /* Pointer to my block device */
   int subdev_index;    /* Indexes the disk that is currently in picture -- pd->subdev[subdev_index] */
};

struct pitdev_struct {
/* Driver related data */
struct my_work_struct *work;
} *pd;

typedef struct pitdev_struct pitdev_t;

初始化我的工作项:

/* Allocate memory for both pd and pd->work */

INIT_WORK(&pd->work->wk, my_work_fn);

pd->work->pd = pd;
pd->work->subdev_index = 0;

我的工作功能定义:

void my_work_fn(struct work_struct *work)
{
   struct my_work_struct *temp = container_of(work, struct my_work_struct, wk);
   pitdev_t *pd = temp->pd;
   int sub_index = temp->subdev_index;

   /* Create a BIO and submit it*/
   submit_bio(WRITE, merged_bio);
}

merged_bio->bi_end_io 中,我安排我的工作item:

schedule_work(&pd->work->wk);

这样做是为了在上一个 BIO 成功传输后立即安排下一个要提交的 BIO。对 submit_bio() 的第一次调用是在不使用工作队列的情况下完成的。第一次调用 submit_bio() 没有任何问题;当我使用工作项调用 submit_bio() 时,系统崩溃了。

有什么想法吗?

I'm trying to implement a producer-consumer like situation in my block level driver (on linux kernel version 2.6.39.1). My block driver's make_request_fn receives a stream of struct bio from a userlevel application. On receiving these BIOs, they are queued up. Next, I create a new struct bio which will hold all the information present in the queued BIOs. This new "merged_bio" will be submitted only if there is a struct request slot available in the lower level driver's request_queue. In the meantime, my block driver will continue to receive BIOs from the userlevel application and will queue them up (a dense load situation). Now, to reduce latency, I want to ensure that as and when BIOs are queued up, a batch comprising of the oldest BIOs are dequeued thereby minimizing idling time in the queue. I am confused as to how I should achieve this pipelined situation (with low per-request latency) wherein my driver's make_request_fn is the producer of BIOs and a function calling submit_bio() is the consumer of these BIOs. Possible approaches include:

  1. Tasklets -- Have a tasklet continuously consume BIOs from the queue once a struct request slot becomes free in the request_queue. This approach won't work because the tasklet handler function is not atomic as it calls submit_bio(), which in turn has a call to schedule().

  2. Work queues -- They can have higher latency than tasklets since the handler function for the work queue can sleep. This could affect my driver's performance since the BIOs may be submitted to the lower level driver at a much later time than when it was actually enqueued by the make_request_fn. I'm unsure about how large the impact would be on the performance (my driver implements a fast logging device.)
    Another issue with work queues is that how and when to schedule the work queue? The merged_bio has to be submitted once the request slot becomes available. So, there has to be some kind of "signaling" mechanism which would schedule the work queue as soon as a struct request becomes available. I don't see how the request_queue can be monitored continuously for a free slot without a signaling or poll()ing mechanism and then explicitly scheduling the work queue. And yes, we cannot call submit_bio() from the callback function of the previously completed BIO.

  3. Kernel threads -- I was considering this as a 3rd option. I don't have much idea about kernel threads, but here is how I was planning to got about it: My driver's make_request_fn would continue to enqueue BIOs. A new kernel thread would be created which would continuously consume BIOs from the queue only when a struct request slot becomes available (and not otherwise). So, each time this kernel thread is scheduled, it would check for an empty request slot and then consume a batch of BIOs from the queue and call submit_bio().

  4. Something more smarter??

Can members of stackoverflow help me choose a smart and efficient way to implement this scenario? Thank you!

UPDATE: I tried the workqueue method but it just resulted in my kernel crashing (/var/log/messages contained garbled text, so I don't have any logs to share). Here is how I went about implementing the workqueue:

The data structure which will be used by the work queue:

struct my_work_struct {
   struct work_struct wk;
   pitdev_t *pd;    /* Pointer to my block device */
   int subdev_index;    /* Indexes the disk that is currently in picture -- pd->subdev[subdev_index] */
};

struct pitdev_struct {
/* Driver related data */
struct my_work_struct *work;
} *pd;

typedef struct pitdev_struct pitdev_t;

Initialize my work item:

/* Allocate memory for both pd and pd->work */

INIT_WORK(&pd->work->wk, my_work_fn);

pd->work->pd = pd;
pd->work->subdev_index = 0;

My work function definition:

void my_work_fn(struct work_struct *work)
{
   struct my_work_struct *temp = container_of(work, struct my_work_struct, wk);
   pitdev_t *pd = temp->pd;
   int sub_index = temp->subdev_index;

   /* Create a BIO and submit it*/
   submit_bio(WRITE, merged_bio);
}

In merged_bio->bi_end_io, I schedule my work item:

schedule_work(&pd->work->wk);

This is done so that the next BIO to be submitted is scheduled soon after the previous BIO has been successfully transferred. The first call to submit_bio() is done without using the work queue. This first call to submit_bio() is taking place without any problems; when called I call submit_bio() using a work item, the system crashes.

Any ideas?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文