循环线程访问 pthread 互斥体

发布于 2024-08-23 08:27:38 字数 4695 浏览 5 评论 0原文

我正在构建这个应用程序,其中我有一个由线程表示的客户端,在循环中运行(直到它收到终止指令),试图访问服务器中数据的关键部分。

当第一个客户端连接到服务器时,他拥有该互斥锁的锁。所有后续连接均被搁置。这是它的“正常”部分。

但是,当第一个线程解锁时,循环将其带回到开头,并且它应该再次竞争锁。但假设他仍然持有锁并在几乎无限循环中执行关键部分(不是无限循环,因为我们可以终止线程,将锁交给其他线程)。

为了恢复这一切......当第一个客户端连接时,它永远拥有锁。其他线程保持在等待队列中,直到第一个线程终止。这是一些代码:

服务器:

int main(int argc, char * argv[])
{
    int servSock;     
    unsigned short servPort;  
    unsigned int clntLen;

    rcvBuf = (char *)malloc((MAXLINE)*sizeof(char));
    pthreads = (fifo_t*)malloc(sizeof(fifo_t));

    struct sockaddr_in servAddr; 
    struct sockaddr_in clntAddr; 

    pthread_attr_t custom_sched_attr;   

    int fifo_max_prio, fifo_min_prio, fifo_mid_prio;   
    struct sched_param fifo_param;    

    pthread_attr_init(&custom_sched_attr);   
    pthread_attr_setinheritsched(&custom_sched_attr, PTHREAD_EXPLICIT_SCHED);   
    pthread_attr_setschedpolicy(&custom_sched_attr, SCHED_FIFO);

    fifo_max_prio = sched_get_priority_max(SCHED_FIFO);   
    fifo_min_prio = sched_get_priority_min(SCHED_FIFO);   
    fifo_mid_prio = (fifo_min_prio + fifo_max_prio)/2;   

    fifo_param.sched_priority = fifo_mid_prio;  

    pthread_attr_setschedparam(&custom_sched_attr, &fifo_param);

    if(argc !=2 ){
        fprintf(stderr,"Usage: %s <Server Port>\n",argv[0]);
        exit(1);
    }

    fifo_init(pthreads);

    db  = db_open("DB", O_RDWR, 0666);
    servPort = atoi(argv[1]);

    if((servSock = socket(AF_INET,SOCK_STREAM,0)) < 0){ 
        perror("Error with Socket()");
        exit(1);
    }

    memset(&servAddr,0,sizeof(servAddr));
    servAddr.sin_family = AF_INET;
    servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servAddr.sin_port = htons(servPort);

    if(bind(servSock,(struct sockaddr*)&servAddr,sizeof(servAddr)) < 0){
        perror("Error with bind()");
        exit(1);
    }

    if(listen(servSock,NUM_THREADS) < 0){
        perror("Error with listen()");
        exit(1);
    }

    for(;;)
    { 
        printf("A estabelecer ligaçao!\n");
        clntLen = sizeof(clntAddr);

        if((clntSock = accept(servSock,(struct sockaddr*)&clntAddr,&clntLen)) < 0)
        {
            perror("Error with accept()");
            exit(0);
        }

        printf("Ligação estabelecida!\n");

        pthread_create(&thread,&custom_sched_attr,&HandleTcpClient,(void *)clntSock);

        printf("Continua a execução nesta thread: %d\n",(int)pthread_self());
   }

   exit(0);
}

void * HandleTcpClient(void * data){

   insert(pthreads,(int)pthread_self());

   int sock = (int)data;

   char * key = (char *)malloc(FIELD * sizeof(char));
   char * dados = (char *)malloc(FIELD * sizeof(char));
   char * vendDev = (char *)malloc(FIELD * sizeof(char));
   char * str = (char*)malloc(MAXLINE * sizeof(char));

   memset(key,0,sizeof(key));
   memset(dados,0,sizeof(dados));
   memset(vendDev,0,sizeof(vendDev));
   memset(str,0,sizeof(str));

   while(1)
   {
   start:

       printf("Sem Lock: %d com socket: %d\n",(int)pthread_self(),sock);

       pthread_mutex_lock(&mutexdb);

       printf("Com Lock: %d com socket: %d\n",(int)pthread_self(),sock);
       int i = 0;
       char op;

       if((recvMsgSize = recv((sock),rcvBuf,MAXLINE,0)) &lt; 0)
       {
           perror("Erro na recepção!\n");
           exit(-1);
       }

       rcvBuf[recvMsgSize]='\0';
       str = (char *)rcvBuf;

       op = *str++;

       while(*str!='|' && *str!=0){
           key[i]=*str;
           str++;
           i++;
       }

       key[++i]='\0';
       str++;

       if((int)op==2 || (int)op==3) strcpy(vendDev,str);  // obter o numero de produtos vendidos/devolvidos
       else strcpy(dados,str);

       if(op == 4 || op == 6)
       {
           db_operate(&db,op,key,dados,sock); 
           pthread_mutex_unlock(&mutexdb);
           printf("Unlock: %d com socket: %d\n",(int)pthread_self(),sock);
           goto start;
       }
       else
           if(op == 7)
           {
               extract(pthreads);
               pthread_mutex_unlock(&mutexdb);
               close(sock);
               pthread_exit(0);
               break;
           }
           else
               if(op == 1 || op == 2 || op == 3 || op == 5)
               {
                   db_search(&db,op,key,vendDev,sock);
                   pthread_mutex_unlock(&mutexdb);
                   printf("Unlock: %d com socket: %d\n",(int)pthread_self(),sock);
                   goto start;
               }
   }

谁能告诉我我在这里做错了什么?我如何实现一个策略,当第一个线程解锁时,下一个线程获得它,并且所述第一个线程返回到等待队列,就像在 FIFO 列表中一样?

谢谢 ;)

im building this application in which i have a client represented by a thread, running in loop (until it receives the instruction to terminate) trying to access a critical section of data in the server.

When the first client connects to the server, he owns the lock to that mutex. all the subsequent connections are put to a hold. This is the "normal" part of it.

But then, when the first thread unlocks, the loop brings it back to the begining and it should be contending for the lock again. But it's assumed that he still holds the lock and executes the critical section in an almost infinite loop (not infinite because we can terminate the thread, giving the lock away for the other threads).

To resume it all...when the first client connects, it owns the lock forever. The other threads remain in a waiting line until the first thread terminates. Here's some code:

The Server:

int main(int argc, char * argv[])
{
    int servSock;     
    unsigned short servPort;  
    unsigned int clntLen;

    rcvBuf = (char *)malloc((MAXLINE)*sizeof(char));
    pthreads = (fifo_t*)malloc(sizeof(fifo_t));

    struct sockaddr_in servAddr; 
    struct sockaddr_in clntAddr; 

    pthread_attr_t custom_sched_attr;   

    int fifo_max_prio, fifo_min_prio, fifo_mid_prio;   
    struct sched_param fifo_param;    

    pthread_attr_init(&custom_sched_attr);   
    pthread_attr_setinheritsched(&custom_sched_attr, PTHREAD_EXPLICIT_SCHED);   
    pthread_attr_setschedpolicy(&custom_sched_attr, SCHED_FIFO);

    fifo_max_prio = sched_get_priority_max(SCHED_FIFO);   
    fifo_min_prio = sched_get_priority_min(SCHED_FIFO);   
    fifo_mid_prio = (fifo_min_prio + fifo_max_prio)/2;   

    fifo_param.sched_priority = fifo_mid_prio;  

    pthread_attr_setschedparam(&custom_sched_attr, &fifo_param);

    if(argc !=2 ){
        fprintf(stderr,"Usage: %s <Server Port>\n",argv[0]);
        exit(1);
    }

    fifo_init(pthreads);

    db  = db_open("DB", O_RDWR, 0666);
    servPort = atoi(argv[1]);

    if((servSock = socket(AF_INET,SOCK_STREAM,0)) < 0){ 
        perror("Error with Socket()");
        exit(1);
    }

    memset(&servAddr,0,sizeof(servAddr));
    servAddr.sin_family = AF_INET;
    servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servAddr.sin_port = htons(servPort);

    if(bind(servSock,(struct sockaddr*)&servAddr,sizeof(servAddr)) < 0){
        perror("Error with bind()");
        exit(1);
    }

    if(listen(servSock,NUM_THREADS) < 0){
        perror("Error with listen()");
        exit(1);
    }

    for(;;)
    { 
        printf("A estabelecer ligaçao!\n");
        clntLen = sizeof(clntAddr);

        if((clntSock = accept(servSock,(struct sockaddr*)&clntAddr,&clntLen)) < 0)
        {
            perror("Error with accept()");
            exit(0);
        }

        printf("Ligação estabelecida!\n");

        pthread_create(&thread,&custom_sched_attr,&HandleTcpClient,(void *)clntSock);

        printf("Continua a execução nesta thread: %d\n",(int)pthread_self());
   }

   exit(0);
}

void * HandleTcpClient(void * data){

   insert(pthreads,(int)pthread_self());

   int sock = (int)data;

   char * key = (char *)malloc(FIELD * sizeof(char));
   char * dados = (char *)malloc(FIELD * sizeof(char));
   char * vendDev = (char *)malloc(FIELD * sizeof(char));
   char * str = (char*)malloc(MAXLINE * sizeof(char));

   memset(key,0,sizeof(key));
   memset(dados,0,sizeof(dados));
   memset(vendDev,0,sizeof(vendDev));
   memset(str,0,sizeof(str));

   while(1)
   {
   start:

       printf("Sem Lock: %d com socket: %d\n",(int)pthread_self(),sock);

       pthread_mutex_lock(&mutexdb);

       printf("Com Lock: %d com socket: %d\n",(int)pthread_self(),sock);
       int i = 0;
       char op;

       if((recvMsgSize = recv((sock),rcvBuf,MAXLINE,0)) < 0)
       {
           perror("Erro na recepção!\n");
           exit(-1);
       }

       rcvBuf[recvMsgSize]='\0';
       str = (char *)rcvBuf;

       op = *str++;

       while(*str!='|' && *str!=0){
           key[i]=*str;
           str++;
           i++;
       }

       key[++i]='\0';
       str++;

       if((int)op==2 || (int)op==3) strcpy(vendDev,str);  // obter o numero de produtos vendidos/devolvidos
       else strcpy(dados,str);

       if(op == 4 || op == 6)
       {
           db_operate(&db,op,key,dados,sock); 
           pthread_mutex_unlock(&mutexdb);
           printf("Unlock: %d com socket: %d\n",(int)pthread_self(),sock);
           goto start;
       }
       else
           if(op == 7)
           {
               extract(pthreads);
               pthread_mutex_unlock(&mutexdb);
               close(sock);
               pthread_exit(0);
               break;
           }
           else
               if(op == 1 || op == 2 || op == 3 || op == 5)
               {
                   db_search(&db,op,key,vendDev,sock);
                   pthread_mutex_unlock(&mutexdb);
                   printf("Unlock: %d com socket: %d\n",(int)pthread_self(),sock);
                   goto start;
               }
   }

Can anyone tell me what am i doing wrong here? How can i implement a policy where when the first thread does the unlock the next thread gains it and the said first thread returns to the waiting line, like in a FIFO list?

Thanks ;)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

我ぃ本無心為│何有愛 2024-08-30 08:27:38

发布的代码存在许多问题:

  • 线程函数分配内存但从不释放它。
  • 您将互斥锁放在阻塞套接字读取上 - 这从来都不是一个好主意。互斥体应该保护线程之间共享的资源 - 在您的情况下是数据库。套接字不是共享的,因此不需要保护。更糟糕的是,等待套接字并持有互斥体会阻止其他线程访问数据库。
  • 在设计方面 - 互斥体属于数据库,而不是套接字读取器。我建议
    将互斥锁添加到数据库结构中,在数据库设置时对其进行初始化,并将锁定/解锁调用隐藏在数据库访问函数中。确定最小的关键部分并对其进行保护。这为您提供了更好的并发性。
  • 执行每个套接字线程仅适用于非常少量的连接。大规模设计几乎总是涉及非阻塞套接字和 select/poll/epoll/kqueue 技巧。

我不太明白问题的 FIFO 部分,但我希望上述几点能够引导您走向正确的方向。

There are number of issues with the code as posted:

  • Thread function allocates memory but never releases it.
  • You hold the mutex over blocking socket read - it's never a good idea. Mutex should protect resource shared between threads - the database in your case. Socket is not shared so it doesn't need protection. Worse - waiting on the socket and holding the mutex prevents other threads from accessing the database.
  • On the design side - the mutex belongs with the database, not the socket reader. I suggest
    adding the mutex to the db structure, initializing it at database setup time, and hiding lock/unlock calls inside db access functions. Identify the smallest critical section possible and protect that. This gives you better concurrency.

  • Doing thread-per-socket only works for very small number of connections. Large-scale design pretty much always involves non-blocking sockets and select/poll/epoll/kqueue tricks.

I didn't really understand the FIFO part of the question, but I hope the points above will guide you into right direction.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文