多线程环境中recv()不会被信号中断

发布于 2024-09-16 02:05:12 字数 3736 浏览 6 评论 0 原文

我有一个线程位于阻塞 recv() 循环中,我想终止(假设这不能更改为 select() 或任何其他异步方法) 。

我还有一个捕获 SIGINT 的信号处理程序,理论上它应该使 recv() 返回错误,并将 errno 设置为 EINTR< /代码>。

但事实并非如此,我认为这与应用程序是多线程的事实有关。还有另一个线程,同时正在等待 pthread_join() 调用。

这里发生了什么事?

编辑:

好的,现在我通过主线程的pthread_kill()将信号显式传递给所有阻塞的recv()线程(这会导致安装了相同的全局 SIGINT 信号处理程序,尽管多次调用是良性的)。但是 recv() 调用仍然没有解除阻塞。

编辑:

我编写了一个重现该问题的代码示例。

  1. 主线程将套接字连接到行为不当的远程主机,该主机不会释放连接。
  2. 所有信号都被封锁。
  3. 读取线程线程已启动。
  4. Main 解锁并安装 SIGINT 的处理程序。
  5. 读取线程解除阻塞并安装 SIGUSR1 的处理程序。
  6. 主线程的信号处理程序向读取线程发送 SIGUSR1

有趣的是,如果我用 sleep() 替换 recv() ,它就会被中断。

PS

或者,您可以只打开 UDP 套接字,而不使用服务器。

客户端

#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>

static void
err(const char *msg)
{
    perror(msg);
    abort();
}

static void
blockall()
{
    sigset_t ss;
    sigfillset(&ss);
    if (pthread_sigmask(SIG_BLOCK, &ss, NULL))
        err("pthread_sigmask");
}

static void
unblock(int signum)
{
    sigset_t ss;
    sigemptyset(&ss);
    sigaddset(&ss, signum);
    if (pthread_sigmask(SIG_UNBLOCK, &ss, NULL))
        err("pthread_sigmask");
}

void
sigusr1(int signum)
{
    (void)signum;
    printf("%lu: SIGUSR1\n", pthread_self());
}

void*
read_thread(void *arg)
{
    int sock, r;
    char buf[100];

    unblock(SIGUSR1);
    signal(SIGUSR1, &sigusr1);
    sock = *(int*)arg;
    printf("Thread (self=%lu, sock=%d)\n", pthread_self(), sock);
    r = 1;
    while (r > 0)
    {
        r = recv(sock, buf, sizeof buf, 0);
        printf("recv=%d\n", r);
    }
    if (r < 0)
        perror("recv");
    return NULL;
}

int sock;
pthread_t t;

void
sigint(int signum)
{
    int r;
    (void)signum;
    printf("%lu: SIGINT\n", pthread_self());
    printf("Killing %lu\n", t);
    r = pthread_kill(t, SIGUSR1);
    if (r)
    {
        printf("%s\n", strerror(r));
        abort();
    }
}

int
main()
{
    pthread_attr_t attr;
    struct sockaddr_in addr;

    printf("main thread: %lu\n", pthread_self());
    memset(&addr, 0, sizeof addr);
    sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (socket < 0)
        err("socket");
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8888);
    if (inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr) <= 0)
        err("inet_pton");
    if (connect(sock, (struct sockaddr *)&addr, sizeof addr))
        err("connect");

    blockall();
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    if (pthread_create(&t, &attr, &read_thread, &sock))
        err("pthread_create");
    pthread_attr_destroy(&attr);
    unblock(SIGINT);
    signal(SIGINT, &sigint);

    if (sleep(1000))
        perror("sleep");
    if (pthread_join(t, NULL))
        err("pthread_join");
    if (close(sock))
        err("close");

    return 0;
}

服务器

import socket
import time

s = socket.socket(socket.AF_INET)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('127.0.0.1',8888))
s.listen(1)
c = []
while True:
    (conn, addr) =  s.accept()
    c.append(conn)

I have a thread that sits in a blocking recv() loop and I want to terminate (assume this can't be changed to select() or any other asynchronous approach).

I also have a signal handler that catches SIGINT and theoretically it should make recv() return with error and errno set to EINTR.

But it doesn't, which I assume has something to do with the fact that the application is multi-threaded. There is also another thread, which is meanwhile waiting on a pthread_join() call.

What's happening here?

EDIT:

OK, now I explicitly deliver the signal to all blocking recv() threads via pthread_kill() from the main thread (which results in the same global SIGINT signal handler installed, though multiple invocations are benign). But recv() call is still not unblocked.

EDIT:

I've written a code sample that reproduces the problem.

  1. Main thread connects a socket to a misbehaving remote host that won't let the connection go.
  2. All signals blocked.
  3. Read thread thread is started.
  4. Main unblocks and installs handler for SIGINT.
  5. Read thread unblocks and installs handler for SIGUSR1.
  6. Main thread's signal handler sends a SIGUSR1 to the read thread.

Interestingly, if I replace recv() with sleep() it is interrupted just fine.

PS

Alternatively you can just open a UDP socket instead of using a server.

client

#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>

static void
err(const char *msg)
{
    perror(msg);
    abort();
}

static void
blockall()
{
    sigset_t ss;
    sigfillset(&ss);
    if (pthread_sigmask(SIG_BLOCK, &ss, NULL))
        err("pthread_sigmask");
}

static void
unblock(int signum)
{
    sigset_t ss;
    sigemptyset(&ss);
    sigaddset(&ss, signum);
    if (pthread_sigmask(SIG_UNBLOCK, &ss, NULL))
        err("pthread_sigmask");
}

void
sigusr1(int signum)
{
    (void)signum;
    printf("%lu: SIGUSR1\n", pthread_self());
}

void*
read_thread(void *arg)
{
    int sock, r;
    char buf[100];

    unblock(SIGUSR1);
    signal(SIGUSR1, &sigusr1);
    sock = *(int*)arg;
    printf("Thread (self=%lu, sock=%d)\n", pthread_self(), sock);
    r = 1;
    while (r > 0)
    {
        r = recv(sock, buf, sizeof buf, 0);
        printf("recv=%d\n", r);
    }
    if (r < 0)
        perror("recv");
    return NULL;
}

int sock;
pthread_t t;

void
sigint(int signum)
{
    int r;
    (void)signum;
    printf("%lu: SIGINT\n", pthread_self());
    printf("Killing %lu\n", t);
    r = pthread_kill(t, SIGUSR1);
    if (r)
    {
        printf("%s\n", strerror(r));
        abort();
    }
}

int
main()
{
    pthread_attr_t attr;
    struct sockaddr_in addr;

    printf("main thread: %lu\n", pthread_self());
    memset(&addr, 0, sizeof addr);
    sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (socket < 0)
        err("socket");
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8888);
    if (inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr) <= 0)
        err("inet_pton");
    if (connect(sock, (struct sockaddr *)&addr, sizeof addr))
        err("connect");

    blockall();
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    if (pthread_create(&t, &attr, &read_thread, &sock))
        err("pthread_create");
    pthread_attr_destroy(&attr);
    unblock(SIGINT);
    signal(SIGINT, &sigint);

    if (sleep(1000))
        perror("sleep");
    if (pthread_join(t, NULL))
        err("pthread_join");
    if (close(sock))
        err("close");

    return 0;
}

server

import socket
import time

s = socket.socket(socket.AF_INET)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('127.0.0.1',8888))
s.listen(1)
c = []
while True:
    (conn, addr) =  s.accept()
    c.append(conn)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

生生漫 2024-09-23 02:05:12

通常信号不会用 EINTR 中断系统调用。历史上有两种可能的信号传递行为:BSD 行为(系统调用在被信号中断时自动重新启动)和 Unix System V 行为(系统调用返回 -1,errno 设置为 EINTR< /code> 当被信号中断时)。 Linux(内核)采用了后者,但 GNU C 库开发人员(正确地)认为 BSD 行为更加理智,现代 Linux 系统也是如此,调用 signal (这是一个库函数) )导致 BSD 行为。

POSIX 允许这两种行为,因此建议始终使用 sigaction,您可以根据需要的行为选择设置 SA_RESTART 标志或忽略它。请参阅此处的 sigaction 文档:

http://www .opengroup.org/onlinepubs/9699919799/functions/sigaction.html

Normally signals do not interrupt system calls with EINTR. Historically there were two possible signal delivery behaviors: the BSD behavior (syscalls are automatically restarted when interrupted by a signal) and the Unix System V behavior (syscalls return -1 with errno set to EINTR when interrupted by a signal). Linux (the kernel) adopted the latter, but the GNU C library developers (correctly) deemed the BSD behavior to be much more sane, and so on modern Linux systems, calling signal (which is a library function) results in the BSD behavior.

POSIX allows either behavior, so it's advisable to always use sigaction where you can choose to set the SA_RESTART flag or omit it depending on the behavior you want. See the documentation for sigaction here:

http://www.opengroup.org/onlinepubs/9699919799/functions/sigaction.html

甜柠檬 2024-09-23 02:05:12

在多线程应用程序中,普通信号可以任意传递到任何线程。使用 pthread_kill 将信号发送到感兴趣的特定线程。

In a multi-threaded application, normal signals can be delivered to any thread arbitrarily. Use pthread_kill to send the signal to the specific thread of interest.

朕就是辣么酷 2024-09-23 02:05:12

信号处理程序是否在等待 recv() 的同一线程中调用?
您可能需要通过 pthread_sigmask() 显式屏蔽所有其他线程中的 SIGINT

Does signal handler invoked in same thread which waits in recv()?
You may need to explicitly mask SIGINT in all other threads via pthread_sigmask()

半步萧音过轻尘 2024-09-23 02:05:12

正如 > 的帖子中提到的,确实可以更改信号活动。
我经常创建自己的使用 sigaction 的“信号”函数。这是我使用的

typedef void Sigfunc(int);

static Sigfunc* 
_signal(int signum, Sigfunc* func)
{
    struct sigaction act, oact;

    act.sa_handler = func;
    sigemptyset(&act.sa_mask);
    act.sa_flags = 0;

    if (signum != SIGALRM)
        act.sa_flags |= SA_NODEFER; //SA_RESTART;

    if (sigaction(signum, &act, &oact) < 0)
        return (SIG_ERR);
    return oact.sa_handler;
}

上面提到的属性是 sa_flags 字段的“或”运算。这是来自“sigaction”的手册页:SA_RESTART 提供了类似 BSD 的行为,允许系统调用跨信号重新启动。 SA_NODEFER 表示允许从其自己的信号处理程序内接收信号。

当信号调用被替换为“_signal”时,线程被中断。输出打印出“中断的系统调用”,并且当发送 SIGUSR1 时,recv 返回 -1。当发送 SIGINT 时,程序完全停止并具有相同的输出,但最后调用了中止。

我没有编写代码的服务器部分,我只是将套接字类型更改为“DGRAM,UDP”以允许客户端启动。

As alluded to in the post by <R..>, it is indeed possible to change the signal activities.
I often create my own "signal" function that makes use of sigaction. Here's what I use

typedef void Sigfunc(int);

static Sigfunc* 
_signal(int signum, Sigfunc* func)
{
    struct sigaction act, oact;

    act.sa_handler = func;
    sigemptyset(&act.sa_mask);
    act.sa_flags = 0;

    if (signum != SIGALRM)
        act.sa_flags |= SA_NODEFER; //SA_RESTART;

    if (sigaction(signum, &act, &oact) < 0)
        return (SIG_ERR);
    return oact.sa_handler;
}

The attribute in question above is the 'or'ing of the sa_flags field. This is from the man page for 'sigaction': SA_RESTART provides the BSD-like behavior of allowing system calls to be restartable across signals. SA_NODEFER means allow the signal to be received from within its own signal handler.

When the signal calls are replaced with "_signal", the thread is interrupted. The output prints out "interrupted system call" and recv returned a -1 when SIGUSR1 was sent. The program stopped altogether with the same output when SIGINT was sent, but the abort was called at the end.

I did not write the server portion of the code, I just changed the socket type to "DGRAM, UDP" to allow the client to start.

嘿看小鸭子会跑 2024-09-23 02:05:12

您可以在 Linux 接收上设置超时: Linux :是否有来自套接字的读取或接收超时?

当您收到信号时,在执行接收的类上调用done。

void* signalThread( void* ptr )
{
    CapturePkts* cap=(CapturePkts*)ptr;
    sigset_t sigSet=cap->getSigSet();
    int sig=-1;
    sigwait(&sigSet,&sig); //signalThread: signal capture thread enabled;
    cout << "signal=" << sig << " caught,ending process" << endl;
    cap->setDone();
    return 0;
}

class CapturePkts
{
     CapturePkts() : _done(false) {}

     sigset_t getSigSet() { return _sigSet; }

     void setDone() {_done=true;}

     bool receive( uint8_t *buffer, int32_t bufSz, int32_t &nbytes)
     {
         bool ret=true;
         while( ! _done ) {
         nbytes = ::recv( _sockid, buffer, bufSz, 0 );
         if(nbytes < 1 ) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
               nbytes=0; //wait for next read event
            else
               ret=false;
         }
         return ret;
     }

     private:
     sigset_t _sigSet;
     bool _done;
};

You can set a timeout on Linux recv: Linux: is there a read or recv from socket with timeout?

When you get a signal, call done on the class doing the receive.

void* signalThread( void* ptr )
{
    CapturePkts* cap=(CapturePkts*)ptr;
    sigset_t sigSet=cap->getSigSet();
    int sig=-1;
    sigwait(&sigSet,&sig); //signalThread: signal capture thread enabled;
    cout << "signal=" << sig << " caught,ending process" << endl;
    cap->setDone();
    return 0;
}

class CapturePkts
{
     CapturePkts() : _done(false) {}

     sigset_t getSigSet() { return _sigSet; }

     void setDone() {_done=true;}

     bool receive( uint8_t *buffer, int32_t bufSz, int32_t &nbytes)
     {
         bool ret=true;
         while( ! _done ) {
         nbytes = ::recv( _sockid, buffer, bufSz, 0 );
         if(nbytes < 1 ) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
               nbytes=0; //wait for next read event
            else
               ret=false;
         }
         return ret;
     }

     private:
     sigset_t _sigSet;
     bool _done;
};
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文