循环线程访问 pthread 互斥体
我正在构建这个应用程序,其中我有一个由线程表示的客户端,在循环中运行(直到它收到终止指令),试图访问服务器中数据的关键部分。
当第一个客户端连接到服务器时,他拥有该互斥锁的锁。所有后续连接均被搁置。这是它的“正常”部分。
但是,当第一个线程解锁时,循环将其带回到开头,并且它应该再次竞争锁。但假设他仍然持有锁并在几乎无限循环中执行关键部分(不是无限循环,因为我们可以终止线程,将锁交给其他线程)。
为了恢复这一切......当第一个客户端连接时,它永远拥有锁。其他线程保持在等待队列中,直到第一个线程终止。这是一些代码:
服务器:
int main(int argc, char * argv[])
{
int servSock;
unsigned short servPort;
unsigned int clntLen;
rcvBuf = (char *)malloc((MAXLINE)*sizeof(char));
pthreads = (fifo_t*)malloc(sizeof(fifo_t));
struct sockaddr_in servAddr;
struct sockaddr_in clntAddr;
pthread_attr_t custom_sched_attr;
int fifo_max_prio, fifo_min_prio, fifo_mid_prio;
struct sched_param fifo_param;
pthread_attr_init(&custom_sched_attr);
pthread_attr_setinheritsched(&custom_sched_attr, PTHREAD_EXPLICIT_SCHED);
pthread_attr_setschedpolicy(&custom_sched_attr, SCHED_FIFO);
fifo_max_prio = sched_get_priority_max(SCHED_FIFO);
fifo_min_prio = sched_get_priority_min(SCHED_FIFO);
fifo_mid_prio = (fifo_min_prio + fifo_max_prio)/2;
fifo_param.sched_priority = fifo_mid_prio;
pthread_attr_setschedparam(&custom_sched_attr, &fifo_param);
if(argc !=2 ){
fprintf(stderr,"Usage: %s <Server Port>\n",argv[0]);
exit(1);
}
fifo_init(pthreads);
db = db_open("DB", O_RDWR, 0666);
servPort = atoi(argv[1]);
if((servSock = socket(AF_INET,SOCK_STREAM,0)) < 0){
perror("Error with Socket()");
exit(1);
}
memset(&servAddr,0,sizeof(servAddr));
servAddr.sin_family = AF_INET;
servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
servAddr.sin_port = htons(servPort);
if(bind(servSock,(struct sockaddr*)&servAddr,sizeof(servAddr)) < 0){
perror("Error with bind()");
exit(1);
}
if(listen(servSock,NUM_THREADS) < 0){
perror("Error with listen()");
exit(1);
}
for(;;)
{
printf("A estabelecer ligaçao!\n");
clntLen = sizeof(clntAddr);
if((clntSock = accept(servSock,(struct sockaddr*)&clntAddr,&clntLen)) < 0)
{
perror("Error with accept()");
exit(0);
}
printf("Ligação estabelecida!\n");
pthread_create(&thread,&custom_sched_attr,&HandleTcpClient,(void *)clntSock);
printf("Continua a execução nesta thread: %d\n",(int)pthread_self());
}
exit(0);
}
void * HandleTcpClient(void * data){
insert(pthreads,(int)pthread_self());
int sock = (int)data;
char * key = (char *)malloc(FIELD * sizeof(char));
char * dados = (char *)malloc(FIELD * sizeof(char));
char * vendDev = (char *)malloc(FIELD * sizeof(char));
char * str = (char*)malloc(MAXLINE * sizeof(char));
memset(key,0,sizeof(key));
memset(dados,0,sizeof(dados));
memset(vendDev,0,sizeof(vendDev));
memset(str,0,sizeof(str));
while(1)
{
start:
printf("Sem Lock: %d com socket: %d\n",(int)pthread_self(),sock);
pthread_mutex_lock(&mutexdb);
printf("Com Lock: %d com socket: %d\n",(int)pthread_self(),sock);
int i = 0;
char op;
if((recvMsgSize = recv((sock),rcvBuf,MAXLINE,0)) < 0)
{
perror("Erro na recepção!\n");
exit(-1);
}
rcvBuf[recvMsgSize]='\0';
str = (char *)rcvBuf;
op = *str++;
while(*str!='|' && *str!=0){
key[i]=*str;
str++;
i++;
}
key[++i]='\0';
str++;
if((int)op==2 || (int)op==3) strcpy(vendDev,str); // obter o numero de produtos vendidos/devolvidos
else strcpy(dados,str);
if(op == 4 || op == 6)
{
db_operate(&db,op,key,dados,sock);
pthread_mutex_unlock(&mutexdb);
printf("Unlock: %d com socket: %d\n",(int)pthread_self(),sock);
goto start;
}
else
if(op == 7)
{
extract(pthreads);
pthread_mutex_unlock(&mutexdb);
close(sock);
pthread_exit(0);
break;
}
else
if(op == 1 || op == 2 || op == 3 || op == 5)
{
db_search(&db,op,key,vendDev,sock);
pthread_mutex_unlock(&mutexdb);
printf("Unlock: %d com socket: %d\n",(int)pthread_self(),sock);
goto start;
}
}
谁能告诉我我在这里做错了什么?我如何实现一个策略,当第一个线程解锁时,下一个线程获得它,并且所述第一个线程返回到等待队列,就像在 FIFO 列表中一样?
谢谢 ;)
im building this application in which i have a client represented by a thread, running in loop (until it receives the instruction to terminate) trying to access a critical section of data in the server.
When the first client connects to the server, he owns the lock to that mutex. all the subsequent connections are put to a hold. This is the "normal" part of it.
But then, when the first thread unlocks, the loop brings it back to the begining and it should be contending for the lock again. But it's assumed that he still holds the lock and executes the critical section in an almost infinite loop (not infinite because we can terminate the thread, giving the lock away for the other threads).
To resume it all...when the first client connects, it owns the lock forever. The other threads remain in a waiting line until the first thread terminates. Here's some code:
The Server:
int main(int argc, char * argv[])
{
int servSock;
unsigned short servPort;
unsigned int clntLen;
rcvBuf = (char *)malloc((MAXLINE)*sizeof(char));
pthreads = (fifo_t*)malloc(sizeof(fifo_t));
struct sockaddr_in servAddr;
struct sockaddr_in clntAddr;
pthread_attr_t custom_sched_attr;
int fifo_max_prio, fifo_min_prio, fifo_mid_prio;
struct sched_param fifo_param;
pthread_attr_init(&custom_sched_attr);
pthread_attr_setinheritsched(&custom_sched_attr, PTHREAD_EXPLICIT_SCHED);
pthread_attr_setschedpolicy(&custom_sched_attr, SCHED_FIFO);
fifo_max_prio = sched_get_priority_max(SCHED_FIFO);
fifo_min_prio = sched_get_priority_min(SCHED_FIFO);
fifo_mid_prio = (fifo_min_prio + fifo_max_prio)/2;
fifo_param.sched_priority = fifo_mid_prio;
pthread_attr_setschedparam(&custom_sched_attr, &fifo_param);
if(argc !=2 ){
fprintf(stderr,"Usage: %s <Server Port>\n",argv[0]);
exit(1);
}
fifo_init(pthreads);
db = db_open("DB", O_RDWR, 0666);
servPort = atoi(argv[1]);
if((servSock = socket(AF_INET,SOCK_STREAM,0)) < 0){
perror("Error with Socket()");
exit(1);
}
memset(&servAddr,0,sizeof(servAddr));
servAddr.sin_family = AF_INET;
servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
servAddr.sin_port = htons(servPort);
if(bind(servSock,(struct sockaddr*)&servAddr,sizeof(servAddr)) < 0){
perror("Error with bind()");
exit(1);
}
if(listen(servSock,NUM_THREADS) < 0){
perror("Error with listen()");
exit(1);
}
for(;;)
{
printf("A estabelecer ligaçao!\n");
clntLen = sizeof(clntAddr);
if((clntSock = accept(servSock,(struct sockaddr*)&clntAddr,&clntLen)) < 0)
{
perror("Error with accept()");
exit(0);
}
printf("Ligação estabelecida!\n");
pthread_create(&thread,&custom_sched_attr,&HandleTcpClient,(void *)clntSock);
printf("Continua a execução nesta thread: %d\n",(int)pthread_self());
}
exit(0);
}
void * HandleTcpClient(void * data){
insert(pthreads,(int)pthread_self());
int sock = (int)data;
char * key = (char *)malloc(FIELD * sizeof(char));
char * dados = (char *)malloc(FIELD * sizeof(char));
char * vendDev = (char *)malloc(FIELD * sizeof(char));
char * str = (char*)malloc(MAXLINE * sizeof(char));
memset(key,0,sizeof(key));
memset(dados,0,sizeof(dados));
memset(vendDev,0,sizeof(vendDev));
memset(str,0,sizeof(str));
while(1)
{
start:
printf("Sem Lock: %d com socket: %d\n",(int)pthread_self(),sock);
pthread_mutex_lock(&mutexdb);
printf("Com Lock: %d com socket: %d\n",(int)pthread_self(),sock);
int i = 0;
char op;
if((recvMsgSize = recv((sock),rcvBuf,MAXLINE,0)) < 0)
{
perror("Erro na recepção!\n");
exit(-1);
}
rcvBuf[recvMsgSize]='\0';
str = (char *)rcvBuf;
op = *str++;
while(*str!='|' && *str!=0){
key[i]=*str;
str++;
i++;
}
key[++i]='\0';
str++;
if((int)op==2 || (int)op==3) strcpy(vendDev,str); // obter o numero de produtos vendidos/devolvidos
else strcpy(dados,str);
if(op == 4 || op == 6)
{
db_operate(&db,op,key,dados,sock);
pthread_mutex_unlock(&mutexdb);
printf("Unlock: %d com socket: %d\n",(int)pthread_self(),sock);
goto start;
}
else
if(op == 7)
{
extract(pthreads);
pthread_mutex_unlock(&mutexdb);
close(sock);
pthread_exit(0);
break;
}
else
if(op == 1 || op == 2 || op == 3 || op == 5)
{
db_search(&db,op,key,vendDev,sock);
pthread_mutex_unlock(&mutexdb);
printf("Unlock: %d com socket: %d\n",(int)pthread_self(),sock);
goto start;
}
}
Can anyone tell me what am i doing wrong here? How can i implement a policy where when the first thread does the unlock the next thread gains it and the said first thread returns to the waiting line, like in a FIFO list?
Thanks ;)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
发布的代码存在许多问题:
将互斥锁添加到数据库结构中,在数据库设置时对其进行初始化,并将锁定/解锁调用隐藏在数据库访问函数中。确定最小的关键部分并对其进行保护。这为您提供了更好的并发性。
我不太明白问题的 FIFO 部分,但我希望上述几点能够引导您走向正确的方向。
There are number of issues with the code as posted:
adding the mutex to the db structure, initializing it at database setup time, and hiding lock/unlock calls inside db access functions. Identify the smallest critical section possible and protect that. This gives you better concurrency.
select/poll/epoll/kqueue
tricks.I didn't really understand the FIFO part of the question, but I hope the points above will guide you into right direction.