ffmpeg(-mt) 和 TBB

发布于 2024-11-08 07:17:59 字数 403 浏览 7 评论 0原文

我刚刚开始使用最新版本的 ffmpeg,其中 ffmpeg-mt 已合并到其中。

但是,由于我的应用程序使用 TBB(英特尔线程构建模块),因此具有新线程创建和同步功能的 ffmpeg-mt 实现不太适合,因为它可能会阻止我的 TBB 任务执行解码函数。它还会不必要地浪费缓存。

我在 pthread.c 中查找,它似乎实现了 ffmpeg 用于启用多线程的接口。

我的问题是是否可以创建一个 tbb.c 来实现相同的功能,但使用 tbb 任务而不是显式线程?

我对 C 没有经验,但我的猜测是不可能轻松地将 tbb (C++)编译成 ffmpeg。那么也许在运行时以某种方式覆盖 ffmpeg 函数指针将是可行的方法?

如果有任何关于将 TBB 实现到 ffmpeg 线程 api 的建议或意见,我将不胜感激。

I just started using the latest build of ffmpeg into which ffmpeg-mt has been merged.

However, since my application uses TBB (Intel Threading Building Blocks), the ffmpeg-mt imlementation with new thread creation and synchronization does not quite fit, as it could potentially block my TBB tasks executing the decode functions. Also it would trash the cache unnecessarily.

I was looking around in pthread.c which seems to implement the interface which ffmpeg uses to enable multithreading.

My question is whether it would be possible to create a tbb.c which implements the same functions but using tbb tasks instead of explicit threads?

I am not experienced with C, but my guess is that it would not be possible to easily compile tbb (which is C++) into ffmpeg. So maybe somehow overwriting the ffmpeg function pointers during run-time would be the way to go?

I would appreciate any suggestions or comments in regards to implementing TBB into ffmpeg threading api.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

南风起 2024-11-15 07:17:59

所以我通过阅读 ffmpeg 代码弄清楚了如何做到这一点。

基本上,您所要做的就是包含以下代码并使用 tbb_avcodec_open/tbb_avcodec_close 而不是 ffmpegs 的 avcodec_open/avcodec_close

这将使用 TBB 任务并行执行解码。

 // Author Robert Nagy

#include "tbb_avcodec.h"

#include <tbb/task.h>
#include <tbb/atomic.h>

extern "C" 
{
    #define __STDC_CONSTANT_MACROS
    #define __STDC_LIMIT_MACROS
    #include <libavformat/avformat.h>
}

int task_execute(AVCodecContext* s, std::function<int(void* arg, int arg_size, int jobnr, int threadnr)>&& func, void* arg, int* ret, int count, int size)
{   
    tbb::atomic<int> counter;
    counter = 0;

    // Execute s->thread_count number of tasks in parallel.
    tbb::parallel_for(0, s->thread_count, 1, [&](int threadnr) 
    {
        while(true)
        {
            int jobnr = counter++;
            if(jobnr >= count)
                break;

            int r = func(arg, size, jobnr, threadnr);
            if (ret)
                ret[jobnr] = r;
        }
    });

    return 0;
}

int thread_execute(AVCodecContext* s, int (*func)(AVCodecContext *c2, void *arg2), void* arg, int* ret, int count, int size)
{
    return task_execute(s, [&](void* arg, int arg_size, int jobnr, int threadnr) -> int
    {
        return func(s, reinterpret_cast<uint8_t*>(arg) + jobnr*size);
    }, arg, ret, count, size);
}

int thread_execute2(AVCodecContext* s, int (*func)(AVCodecContext* c2, void* arg2, int, int), void* arg, int* ret, int count)
{
    return task_execute(s, [&](void* arg, int arg_size, int jobnr, int threadnr) -> int
    {
        return func(s, arg, jobnr, threadnr);
    }, arg, ret, count, 0);
}

void thread_init(AVCodecContext* s)
{
    static const size_t MAX_THREADS = 16; // See mpegvideo.h
    static int dummy_opaque;

    s->active_thread_type = FF_THREAD_SLICE;
    s->thread_opaque      = &dummy_opaque; 
    s->execute            = thread_execute;
    s->execute2           = thread_execute2;
    s->thread_count       = MAX_THREADS; // We are using a task-scheduler, so use as many "threads/tasks" as possible.
}

void thread_free(AVCodecContext* s)
{
    s->thread_opaque = nullptr;
}

int tbb_avcodec_open(AVCodecContext* avctx, AVCodec* codec)
{
    avctx->thread_count = 1;
    if((codec->capabilities & CODEC_CAP_SLICE_THREADS) && (avctx->thread_type & FF_THREAD_SLICE))
        thread_init(avctx);
// ff_thread_init will not be executed since thread_opaque != nullptr || thread_count == 1.
    return avcodec_open(avctx, codec); 
}

int tbb_avcodec_close(AVCodecContext* avctx)
{
    thread_free(avctx);
    // ff_thread_free will not be executed since thread_opaque == nullptr.
    return avcodec_close(avctx); 
}

So I figured out how to do it by reading through the ffmpeg code.

Basicly all you have to do is to include the code below and use tbb_avcodec_open/tbb_avcodec_close instead of ffmpegs' avcodec_open/avcodec_close.

This will use TBB tasks to execute decoding in parallel.

 // Author Robert Nagy

#include "tbb_avcodec.h"

#include <tbb/task.h>
#include <tbb/atomic.h>

extern "C" 
{
    #define __STDC_CONSTANT_MACROS
    #define __STDC_LIMIT_MACROS
    #include <libavformat/avformat.h>
}

int task_execute(AVCodecContext* s, std::function<int(void* arg, int arg_size, int jobnr, int threadnr)>&& func, void* arg, int* ret, int count, int size)
{   
    tbb::atomic<int> counter;
    counter = 0;

    // Execute s->thread_count number of tasks in parallel.
    tbb::parallel_for(0, s->thread_count, 1, [&](int threadnr) 
    {
        while(true)
        {
            int jobnr = counter++;
            if(jobnr >= count)
                break;

            int r = func(arg, size, jobnr, threadnr);
            if (ret)
                ret[jobnr] = r;
        }
    });

    return 0;
}

int thread_execute(AVCodecContext* s, int (*func)(AVCodecContext *c2, void *arg2), void* arg, int* ret, int count, int size)
{
    return task_execute(s, [&](void* arg, int arg_size, int jobnr, int threadnr) -> int
    {
        return func(s, reinterpret_cast<uint8_t*>(arg) + jobnr*size);
    }, arg, ret, count, size);
}

int thread_execute2(AVCodecContext* s, int (*func)(AVCodecContext* c2, void* arg2, int, int), void* arg, int* ret, int count)
{
    return task_execute(s, [&](void* arg, int arg_size, int jobnr, int threadnr) -> int
    {
        return func(s, arg, jobnr, threadnr);
    }, arg, ret, count, 0);
}

void thread_init(AVCodecContext* s)
{
    static const size_t MAX_THREADS = 16; // See mpegvideo.h
    static int dummy_opaque;

    s->active_thread_type = FF_THREAD_SLICE;
    s->thread_opaque      = &dummy_opaque; 
    s->execute            = thread_execute;
    s->execute2           = thread_execute2;
    s->thread_count       = MAX_THREADS; // We are using a task-scheduler, so use as many "threads/tasks" as possible.
}

void thread_free(AVCodecContext* s)
{
    s->thread_opaque = nullptr;
}

int tbb_avcodec_open(AVCodecContext* avctx, AVCodec* codec)
{
    avctx->thread_count = 1;
    if((codec->capabilities & CODEC_CAP_SLICE_THREADS) && (avctx->thread_type & FF_THREAD_SLICE))
        thread_init(avctx);
// ff_thread_init will not be executed since thread_opaque != nullptr || thread_count == 1.
    return avcodec_open(avctx, codec); 
}

int tbb_avcodec_close(AVCodecContext* avctx)
{
    thread_free(avctx);
    // ff_thread_free will not be executed since thread_opaque == nullptr.
    return avcodec_close(avctx); 
}
疏忽 2024-11-15 07:17:59

在此重新发布我在TBB 论坛上给您的回复,为了 SO 的任何人都可以感兴趣。

上面答案中的代码对我来说看起来不错;这是在考虑本机线程设计的上下文中使用 TBB 的巧妙方法。我想知道它是否可以变得更加TBish,可以这么说。我有一些想法,如果你有时间和意愿,你可以尝试一下。

如果希望/需要控制线程数量,则可能会对以下两项感兴趣。

  • 在 thread_init 中,创建一个堆分配的 tbb::task_scheduler_init (TSI) 对象,并使用所需数量的线程(不需要 MAX_THREADS)对其进行初始化。如果可能/允许的话,将此对象的地址保留在s->thread_opaque中;如果没有,可能的解决方案是使用全局映射将 AVCodecContext* 映射到相应的task_scheduler_init 的地址。
  • 相应地在thread_free中,获取并移除TSI对象。

与上述无关,另一个潜在的变化是如何调用 tbb::parallel_for 。它不能仅仅用于创建足够的线程,而是可以用于其直接目的,如下所示?

int task_execute(AVCodecContext* s,
                 std::function<int(void*, int, int, int)>&& f,
                 void* arg, int* ret, int count, int size)   
{      
    tbb::atomic<int> counter;   
    counter = 0;   

    // Execute 'count' number of tasks in parallel.   
    tbb::parallel_for(tbb::blocked_range<int>(0, count, 2),
                      [&](const tbb::blocked_range<int> &r)    
    {   
        int threadnr = counter++;   
        for(int jobnr=r.begin(); jobnr!=r.end(); ++jobnr)
        {   
            int r = func(arg, size, jobnr, threadnr);   
            if (ret)   
                ret[jobnr] = r;   
        }
        --counter;
    });   

    return 0;   
}

如果 count 显着大于 thread_count,则性能会更好,因为 a) 更多并行裕量意味着 TBB 工作更高效(您显然知道这一点),b) 的开销集中式原子计数器分布在更多迭代中。请注意,我为 blocked_range 选择了粒度 2;这是因为计数器在循环体内同时递增和递减,因此每个任务至少需要两次迭代(相应地,count>=2*thread_count)才能“匹配”您的变体。

Re-posting here my response to you at the TBB forum, for sake of whoever at SO can be interested.

Your code in the answer above looks good to me; a clever way to use TBB in a context that was designed with native threads in mind. I wonder if it can be made even more TBBish, so to say. I have some ideas which you can try if you have time and desire.

The following two items can be of interest if there is a desire/need to control the number of threads.

  • in thread_init, create a heap-allocated tbb::task_scheduler_init (TSI) object, and initialize it with as many threads as desired (not necessary MAX_THREADS). Keep the address of this object in s->thread_opaque if possible/allowed; if not, a possible solution is a global map that maps AVCodecContext* to the address of the corresponding task_scheduler_init.
  • correspondingly in thread_free, obtain and remove the TSI object.

Independently of the above, another potential change is in how to call tbb::parallel_for. Instead of using it to merely create enough threads, cannot it be used for its direct purpose, like below?

int task_execute(AVCodecContext* s,
                 std::function<int(void*, int, int, int)>&& f,
                 void* arg, int* ret, int count, int size)   
{      
    tbb::atomic<int> counter;   
    counter = 0;   

    // Execute 'count' number of tasks in parallel.   
    tbb::parallel_for(tbb::blocked_range<int>(0, count, 2),
                      [&](const tbb::blocked_range<int> &r)    
    {   
        int threadnr = counter++;   
        for(int jobnr=r.begin(); jobnr!=r.end(); ++jobnr)
        {   
            int r = func(arg, size, jobnr, threadnr);   
            if (ret)   
                ret[jobnr] = r;   
        }
        --counter;
    });   

    return 0;   
}

This can perform better if count is significantly greater than thread_count, because a) more parallel slack means TBB works more efficiently (which you apparently know), and b) the overhead of the centralized atomic counter is spread over more iterations. Note that I selected the grain size of 2 for blocked_range; this is because the counter is both incremented and decremented inside the loop body, and so at least two iterations per task (and correspondingly, count>=2*thread_count) are necessary to "match" your variant.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文