并发 c++ 中的块/冻结使用英特尔 TBB 的代码

发布于 2024-10-25 01:00:37 字数 4345 浏览 3 评论 0原文

我尝试使用英特尔 TBB 编写一个管道版本的 Bitonic Sort,使用文件读取、排序、文件写入阶段,如下所示。代码在 while(!outQueue.try_pop(line)); 处的自旋锁处冻结。在文件写入器过滤器中。有人可以解释为什么会这样吗?

更新: 我做了一些进一步的测试,发现由 try_pop 从头文件 _concurrent_queue_internal.h 调用的internal_try_pop 有一个compare_and_swap 操作,该操作对于这个特定的try_pop 永远失败。以下是我从internal_try_pop中提取的值,

head counter(k)15
tail counter1605177747
item ticket(tk)15
k after head CAS 15
(k=tk)15,15---break!!

我认为尾部计数器值是垃圾。对于这种情况,我能想到的唯一原因是排序器添加到队列的值可能会被它隐式修改,从而使其不可用。

有什么想法吗?

谢谢 :)

#include <iostream>
#include <sstream>
#include <string>
#include <algorithm>
#include <fstream>
#include "tbb\parallel_for.h"
#include "tbb\blocked_range.h"
#include "tbb\pipeline.h"
#include "tbb\concurrent_queue.h"

using namespace tbb;
using namespace std;

// Filter that writes lines to the output file.
class FileWriterFilter: public tbb::filter {
string outPath;
public:
FileWriterFilter(string outPath );
/*override*/void* operator()( void* item );
};

FileWriterFilter::FileWriterFilter( string outPath ) :
tbb::filter(/*is_serial=*/true),
outPath(outPath)
{
}

void* FileWriterFilter::operator()( void* item ) {

concurrent_queue<string>& outQueue = *static_cast<concurrent_queue<string>*>(item);
string line;
while(!outQueue.try_pop(line));

ofstream myfile(outPath);
if (myfile.is_open())
{
    myfile <<line<<endl;
}
//myfile.close();
return NULL;

 }

 class FileReaderFilter: public tbb::filter {
public:

FileReaderFilter(string inPath);

private:
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);

 };

 FileReaderFilter::FileReaderFilter(string inPath ) :
filter(/*is_serial=*/true),
ifs(inPath)
{
}

 void* FileReaderFilter::operator()(void*) {

string temp;

if( getline( ifs, temp ))
{

    queue.push(temp);
}

return &queue;
 }

class BitonicSort: public tbb::filter{

public:
    BitonicSort();
/*override*/void* operator()( void* item );

size_t *a;

private : static const  bool ASCENDING=true, DESCENDING=false;

public :void sort(size_t *b,int n)
{
    a=b;
    bitonicSort(0,n,ASCENDING);

}

private: void bitonicSort(int lo,int n,bool dir)
{
    if(n>1)
    {
        int m=n/2;
        bitonicSort(lo,m,ASCENDING);

        bitonicSort(lo+m,m,DESCENDING);
        bitonicMerge(lo,n,dir);


    }

}

private : void bitonicMerge(int lo,int n,bool dir)
     {
         if(n>1)
         {
             int m=n/2;
             for(int i=lo;i<lo+m;i++)
             {
                 compare(i,i+m,dir);

             }
             bitonicMerge(lo,m,dir);
             bitonicMerge(lo+m,m,dir);
         }
     }

private : void compare(int i,int j, bool dir)
          {
              if(dir==a[i]>a[j])
              {
                  exchange(i,j);

              }

          }

private : void exchange(int i,int j)
          {
            /*  cout<<a[i]<<" "<<a[j]<<endl;*/
              int t=a[i];
              a[i]=a[j];
              a[j]=t;
              /*cout<<a[i]<<" "<<a[j]<<endl<<endl;*/
          }

   private :string convertInt(int number)
   {
   stringstream ss;//create a stringstream
   ss << number;//add number to the stream
   return ss.str();//return a string with the contents of the stream
}
 };


 BitonicSort::BitonicSort() :
tbb::filter(/*serial=*/false)
 {}

 /*override*/void* BitonicSort::operator()( void* item ) {

int num_elem=2048;
size_t *max = new size_t[num_elem];
concurrent_queue<string>& queue = *static_cast<concurrent_queue<string>*>(item);
concurrent_queue<string> outQueue;
string line;
while(!queue.try_pop(line));  
istringstream iss(line);
int i=0;
do
{
    string sub;
    iss >> sub;
    max[i]=atoi(sub.c_str());;
    i++;
} while (iss);

sort(max,num_elem);

string out;

for(int i=0;i<num_elem;i++)
{
    out.append(convertInt(max[i]).append(" "));
}

outQueue.push(out);

return &outQueue;
 }


 int main() {

tbb::pipeline pipeline;

FileReaderFilter reader("sample.txt");
pipeline.add_filter(reader);

BitonicSort sorter;
pipeline.add_filter(sorter);

FileWriterFilter writer("test.txt");
pipeline.add_filter(writer);

pipeline.run(3);


pipeline.clear();

system("PAUSE");
 }

I tried out writing a pipelined version of Bitonic Sort using with File Read, Sort, File Write stages using Intel TBB as shown below. The code freezes at the spinlock at while(!outQueue.try_pop(line)); in the FileWriter Filter. Can someone explain why this might be?

update:
I did some further testing and discovered that internal_try_pop that is called by try_pop from the header file _concurrent_queue_internal.h has a compare_and_swap operation that fails forever for this particular try_pop. The following are values i extracted from the internal_try_pop

head counter(k)15
tail counter1605177747
item ticket(tk)15
k after head CAS 15
(k=tk)15,15---break!!

I think the tail counter value is garbage. The only reason i can think of for this situation is that the value added to the queue by the sorter may be implicitly modified by it thus making it unavailable.

Any ideas?

Thanks :)

#include <iostream>
#include <sstream>
#include <string>
#include <algorithm>
#include <fstream>
#include "tbb\parallel_for.h"
#include "tbb\blocked_range.h"
#include "tbb\pipeline.h"
#include "tbb\concurrent_queue.h"

using namespace tbb;
using namespace std;

// Filter that writes lines to the output file.
class FileWriterFilter: public tbb::filter {
string outPath;
public:
FileWriterFilter(string outPath );
/*override*/void* operator()( void* item );
};

FileWriterFilter::FileWriterFilter( string outPath ) :
tbb::filter(/*is_serial=*/true),
outPath(outPath)
{
}

void* FileWriterFilter::operator()( void* item ) {

concurrent_queue<string>& outQueue = *static_cast<concurrent_queue<string>*>(item);
string line;
while(!outQueue.try_pop(line));

ofstream myfile(outPath);
if (myfile.is_open())
{
    myfile <<line<<endl;
}
//myfile.close();
return NULL;

 }

 class FileReaderFilter: public tbb::filter {
public:

FileReaderFilter(string inPath);

private:
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);

 };

 FileReaderFilter::FileReaderFilter(string inPath ) :
filter(/*is_serial=*/true),
ifs(inPath)
{
}

 void* FileReaderFilter::operator()(void*) {

string temp;

if( getline( ifs, temp ))
{

    queue.push(temp);
}

return &queue;
 }

class BitonicSort: public tbb::filter{

public:
    BitonicSort();
/*override*/void* operator()( void* item );

size_t *a;

private : static const  bool ASCENDING=true, DESCENDING=false;

public :void sort(size_t *b,int n)
{
    a=b;
    bitonicSort(0,n,ASCENDING);

}

private: void bitonicSort(int lo,int n,bool dir)
{
    if(n>1)
    {
        int m=n/2;
        bitonicSort(lo,m,ASCENDING);

        bitonicSort(lo+m,m,DESCENDING);
        bitonicMerge(lo,n,dir);


    }

}

private : void bitonicMerge(int lo,int n,bool dir)
     {
         if(n>1)
         {
             int m=n/2;
             for(int i=lo;i<lo+m;i++)
             {
                 compare(i,i+m,dir);

             }
             bitonicMerge(lo,m,dir);
             bitonicMerge(lo+m,m,dir);
         }
     }

private : void compare(int i,int j, bool dir)
          {
              if(dir==a[i]>a[j])
              {
                  exchange(i,j);

              }

          }

private : void exchange(int i,int j)
          {
            /*  cout<<a[i]<<" "<<a[j]<<endl;*/
              int t=a[i];
              a[i]=a[j];
              a[j]=t;
              /*cout<<a[i]<<" "<<a[j]<<endl<<endl;*/
          }

   private :string convertInt(int number)
   {
   stringstream ss;//create a stringstream
   ss << number;//add number to the stream
   return ss.str();//return a string with the contents of the stream
}
 };


 BitonicSort::BitonicSort() :
tbb::filter(/*serial=*/false)
 {}

 /*override*/void* BitonicSort::operator()( void* item ) {

int num_elem=2048;
size_t *max = new size_t[num_elem];
concurrent_queue<string>& queue = *static_cast<concurrent_queue<string>*>(item);
concurrent_queue<string> outQueue;
string line;
while(!queue.try_pop(line));  
istringstream iss(line);
int i=0;
do
{
    string sub;
    iss >> sub;
    max[i]=atoi(sub.c_str());;
    i++;
} while (iss);

sort(max,num_elem);

string out;

for(int i=0;i<num_elem;i++)
{
    out.append(convertInt(max[i]).append(" "));
}

outQueue.push(out);

return &outQueue;
 }


 int main() {

tbb::pipeline pipeline;

FileReaderFilter reader("sample.txt");
pipeline.add_filter(reader);

BitonicSort sorter;
pipeline.add_filter(sorter);

FileWriterFilter writer("test.txt");
pipeline.add_filter(writer);

pipeline.run(3);


pipeline.clear();

system("PAUSE");
 }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

月亮坠入山谷 2024-11-01 01:00:37

我找到了!这是一个非常微不足道的错误。我已将第二个并发队列声明为管道排序过滤器的运算符方法中的方法变量。因此,每次执行操作符方法时,队列都会重新初始化,从而使发送到写入器过滤器的指针无效。队列必须是排序器过滤器的类变量,并且一切正常。文件编写器还有另一个错误,已在下面进行了更改。 `

   #include <string>
   #include <algorithm>
   #include <fstream>
   #include "tbb\parallel_for.h"
   #include "tbb\blocked_range.h"
   #include "tbb\pipeline.h"
   #include "tbb\concurrent_queue.h"
   #include "tbb\task_scheduler_init.h"
   #include "tbb\tbb_thread.h"
   #include "tbb\task.h"
   #include <iostream>
   #include <sstream>

  using namespace tbb;
  using namespace std;


 // Filter that writes lines to the output file.
 class FileWriterFilter: public tbb::filter {

public:
int count;
FileWriterFilter(FILE* outFile);

private:
    FILE* outFile;

/*override*/void* operator()( void* item );
};

FileWriterFilter::FileWriterFilter(FILE* outFile) :
tbb::filter(/*is_serial=*/true),
outFile(outFile),count(0)
{
}

/*override*/void* FileWriterFilter::operator()( void* item ) {

tbb::concurrent_queue<string> &outQueue = *static_cast<tbb::concurrent_queue<string>*>   (item);
string outLine;

while(!outQueue.try_pop(outLine))
    this_tbb_thread::yield();

fprintf(outFile,outLine.append("\n").c_str());

count++;
if(count==10000){
    cout<<"over"<<endl;

}
return NULL;

 }


class FileReaderFilter: public tbb::filter {
public:
    FileReaderFilter(char* inPath);

private:
int count;
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);

 };

FileReaderFilter::FileReaderFilter(char* inPath ) :
filter(/*is_serial=*/true),
ifs(inPath),count(0)
{
}

/*override*/void* FileReaderFilter::operator()(void*) {

string temp;
count++;
if(count<=10000){
    if( getline( ifs, temp ))
    {

           queue.push(temp);

    }
    return &queue;
 }
else{
    return NULL;
}
}


class bitonicMerger : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;
private : static const  bool ASCENDING=true, DESCENDING=false;

public:
    bitonicMerger(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}


    task* execute() {
    if(n>1)
    {                   
        int m=n/2;
        for(int i=lo;i<lo+m;i++)
        {
         compare(i,i+m,dir);
        }       

        int count = 1;
        tbb::task_list list;
        ++count;
        list.push_back( *new( allocate_child() ) bitonicMerger(lo,m,dir,a) );
        ++count;
        list.push_back( *new( allocate_child() ) bitonicMerger(lo+m,m,dir,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list);
    }
    return NULL;
}

   private : void compare(int i,int j, bool dir)
          {
              if(dir==a[i]>a[j])
              {
                  exchange(i,j);

              }

          }

private : void exchange(int i,int j)
          {

              int t=a[i];
              a[i]=a[j];
              a[j]=t;

          }

   };


 class bitonicSorter : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;

private : static const  bool ASCENDING=true, DESCENDING=false;

public:
    bitonicSorter(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}

task* execute() {
    if(n>1)
    {                   
        int m=n/2;
        int count = 1;
        tbb::task_list list;
        ++count;
        list.push_back( *new( allocate_child() ) bitonicSorter(lo,m,ASCENDING,a) );
        ++count;
        list.push_back( *new( allocate_child() ) bitonicSorter(lo+m,m,DESCENDING,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list);



        count = 1;
        tbb::task_list list1;
        ++count;
        list1.push_back( *new( allocate_child() ) bitonicMerger(lo,n,dir,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list1);

    }
    return NULL;
}

 };




 class TBitonicSort : public tbb::filter{


public:
    TBitonicSort();
/*override*/void* operator()( void* item );


size_t *a;

private : static const  bool ASCENDING=true, DESCENDING=false;
private : tbb::concurrent_queue<string> outQueue;

public :void sort(size_t *b,int n)
{
    a=b;        
    bitonicSorter& tt = *new(tbb::task::allocate_root())   bitonicSorter(0,n,ASCENDING,a);
    tbb::task::spawn_root_and_wait(tt); 
}




};

string convertInt(int number)
{
   stringstream ss;//create a stringstream
   ss << number;//add number to the stream
   return ss.str();//return a string with the contents of the stream
}




TBitonicSort::TBitonicSort() :
filter(/*is_serial=*/true) 
{}

/*override*/void* TBitonicSort::operator()( void* item ) {

int num_elem=2048;
size_t *max = new size_t[num_elem];
tbb::concurrent_queue<string>& queue = *static_cast<tbb::concurrent_queue<string>*>(item);

string line;

while(!queue.try_pop(line))
    this_tbb_thread::yield();   

istringstream iss(line);
int i=0;
do
{
    string sub;
    iss >> sub;
    max[i]=atoi(sub.c_str());;
    i++;
} while (iss);

sort(max,num_elem);

string out;


for(int i=0;i<num_elem;i++)
{
    out.append(convertInt(max[i]).append(" "));
}


outQueue.push(out);

return &outQueue;
}

int run_pipe(int threads)
{
    FILE* output_file = fopen("test.txt","w");
    if( !output_file ) {
        perror( "test.txt" );
        return 0;
    }
    char* input_file="sample.txt";


    tbb::pipeline pipeline;

    FileReaderFilter reader(input_file);
    pipeline.add_filter(reader);

    TBitonicSort sorter;
    pipeline.add_filter(sorter);

    FileWriterFilter writer(output_file);
    pipeline.add_filter(writer);

    tbb::tick_count t0 = tbb::tick_count::now();
    pipeline.run(threads);
    tbb::tick_count t1 = tbb::tick_count::now();

    fclose( output_file );
    pipeline.clear();

    if(threads==1){
         printf("serial run   time = %g\n", (t1-t0).seconds());
    }
    else{
        printf("parallel run time = %g\n", (t1-t0).seconds());
    }

    return 0;
 }

int main() {

int threads[2]={1,3};


for(int i=0;i<2;i++)
{
    run_pipe(threads[i]);
}
system("PAUSE");
}

I found it! It was quite a trivial error. I have declared the 2nd concurrent_queue as a method variable in the operator method of the sorter filter of the pipeline. Thus every time the operator method executes the queue is reinitialized invalidating the pointer sent to the writer filter. the queue has to be a class variable of the sorter filter and everything works fine. There was another bug with the file writer which has been changed in the following. `

   #include <string>
   #include <algorithm>
   #include <fstream>
   #include "tbb\parallel_for.h"
   #include "tbb\blocked_range.h"
   #include "tbb\pipeline.h"
   #include "tbb\concurrent_queue.h"
   #include "tbb\task_scheduler_init.h"
   #include "tbb\tbb_thread.h"
   #include "tbb\task.h"
   #include <iostream>
   #include <sstream>

  using namespace tbb;
  using namespace std;


 // Filter that writes lines to the output file.
 class FileWriterFilter: public tbb::filter {

public:
int count;
FileWriterFilter(FILE* outFile);

private:
    FILE* outFile;

/*override*/void* operator()( void* item );
};

FileWriterFilter::FileWriterFilter(FILE* outFile) :
tbb::filter(/*is_serial=*/true),
outFile(outFile),count(0)
{
}

/*override*/void* FileWriterFilter::operator()( void* item ) {

tbb::concurrent_queue<string> &outQueue = *static_cast<tbb::concurrent_queue<string>*>   (item);
string outLine;

while(!outQueue.try_pop(outLine))
    this_tbb_thread::yield();

fprintf(outFile,outLine.append("\n").c_str());

count++;
if(count==10000){
    cout<<"over"<<endl;

}
return NULL;

 }


class FileReaderFilter: public tbb::filter {
public:
    FileReaderFilter(char* inPath);

private:
int count;
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);

 };

FileReaderFilter::FileReaderFilter(char* inPath ) :
filter(/*is_serial=*/true),
ifs(inPath),count(0)
{
}

/*override*/void* FileReaderFilter::operator()(void*) {

string temp;
count++;
if(count<=10000){
    if( getline( ifs, temp ))
    {

           queue.push(temp);

    }
    return &queue;
 }
else{
    return NULL;
}
}


class bitonicMerger : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;
private : static const  bool ASCENDING=true, DESCENDING=false;

public:
    bitonicMerger(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}


    task* execute() {
    if(n>1)
    {                   
        int m=n/2;
        for(int i=lo;i<lo+m;i++)
        {
         compare(i,i+m,dir);
        }       

        int count = 1;
        tbb::task_list list;
        ++count;
        list.push_back( *new( allocate_child() ) bitonicMerger(lo,m,dir,a) );
        ++count;
        list.push_back( *new( allocate_child() ) bitonicMerger(lo+m,m,dir,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list);
    }
    return NULL;
}

   private : void compare(int i,int j, bool dir)
          {
              if(dir==a[i]>a[j])
              {
                  exchange(i,j);

              }

          }

private : void exchange(int i,int j)
          {

              int t=a[i];
              a[i]=a[j];
              a[j]=t;

          }

   };


 class bitonicSorter : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;

private : static const  bool ASCENDING=true, DESCENDING=false;

public:
    bitonicSorter(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}

task* execute() {
    if(n>1)
    {                   
        int m=n/2;
        int count = 1;
        tbb::task_list list;
        ++count;
        list.push_back( *new( allocate_child() ) bitonicSorter(lo,m,ASCENDING,a) );
        ++count;
        list.push_back( *new( allocate_child() ) bitonicSorter(lo+m,m,DESCENDING,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list);



        count = 1;
        tbb::task_list list1;
        ++count;
        list1.push_back( *new( allocate_child() ) bitonicMerger(lo,n,dir,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list1);

    }
    return NULL;
}

 };




 class TBitonicSort : public tbb::filter{


public:
    TBitonicSort();
/*override*/void* operator()( void* item );


size_t *a;

private : static const  bool ASCENDING=true, DESCENDING=false;
private : tbb::concurrent_queue<string> outQueue;

public :void sort(size_t *b,int n)
{
    a=b;        
    bitonicSorter& tt = *new(tbb::task::allocate_root())   bitonicSorter(0,n,ASCENDING,a);
    tbb::task::spawn_root_and_wait(tt); 
}




};

string convertInt(int number)
{
   stringstream ss;//create a stringstream
   ss << number;//add number to the stream
   return ss.str();//return a string with the contents of the stream
}




TBitonicSort::TBitonicSort() :
filter(/*is_serial=*/true) 
{}

/*override*/void* TBitonicSort::operator()( void* item ) {

int num_elem=2048;
size_t *max = new size_t[num_elem];
tbb::concurrent_queue<string>& queue = *static_cast<tbb::concurrent_queue<string>*>(item);

string line;

while(!queue.try_pop(line))
    this_tbb_thread::yield();   

istringstream iss(line);
int i=0;
do
{
    string sub;
    iss >> sub;
    max[i]=atoi(sub.c_str());;
    i++;
} while (iss);

sort(max,num_elem);

string out;


for(int i=0;i<num_elem;i++)
{
    out.append(convertInt(max[i]).append(" "));
}


outQueue.push(out);

return &outQueue;
}

int run_pipe(int threads)
{
    FILE* output_file = fopen("test.txt","w");
    if( !output_file ) {
        perror( "test.txt" );
        return 0;
    }
    char* input_file="sample.txt";


    tbb::pipeline pipeline;

    FileReaderFilter reader(input_file);
    pipeline.add_filter(reader);

    TBitonicSort sorter;
    pipeline.add_filter(sorter);

    FileWriterFilter writer(output_file);
    pipeline.add_filter(writer);

    tbb::tick_count t0 = tbb::tick_count::now();
    pipeline.run(threads);
    tbb::tick_count t1 = tbb::tick_count::now();

    fclose( output_file );
    pipeline.clear();

    if(threads==1){
         printf("serial run   time = %g\n", (t1-t0).seconds());
    }
    else{
        printf("parallel run time = %g\n", (t1-t0).seconds());
    }

    return 0;
 }

int main() {

int threads[2]={1,3};


for(int i=0;i<2;i++)
{
    run_pipe(threads[i]);
}
system("PAUSE");
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文