多线程UDP服务器与epoll?

发布于 2024-09-28 03:00:17 字数 3796 浏览 7 评论 0原文

我想在 C/Linux 中开发一个多线程 UDP 服务器。该服务在单个端口 x 上运行,因此只能将单个 UDP 套接字绑定到它。为了在高负载下工作,我有 n 个线程(静态定义),比如每个 CPU 1 个线程。可以使用 epoll_wait 将工作交付给线程,因此线程可以根据需要使用“EPOLLET |”唤醒。埃波罗尼射击'。我附上了一个代码示例:

static int epfd;
static sig_atomic_t sigint = 0;

...

/* Thread routine with epoll_wait */
static void *process_clients(void *pevents)
{
    int rc, i, sock, nfds;
    struct epoll_event ep, *events = (struct epoll_event *) pevents;

    while (!sigint) {
        nfds = epoll_wait(epfd, events, MAX_EVENT_NUM, 500);

        for (i = 0; i < nfds; ++i) {
           if (events[i].data.fd < 0)
                continue;

           sock = events[i].data.fd;

           if((events[i].events & EPOLLIN) == EPOLLIN) {
               printf("Event dispatch!\n");
               handle_request(sock); // do a recvfrom
           } else
               whine("Unknown poll event!\n");

           memset(&ep, 0, sizeof(ep));
           ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
           ep.data.fd = sock;

           rc = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ep);
           if(rc < 0)
               error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");
       }
    }

    pthread_exit(NULL);
}

int main(int argc, char **argv)
{
    int rc, i, cpu, sock, opts;
    struct sockaddr_in sin;
    struct epoll_event ep, *events;
    char *local_addr = "192.168.1.108";
    void *status;
    pthread_t *threads = NULL;
    cpu_set_t cpuset;

    threads = xzmalloc(sizeof(*threads) * MAX_THRD_NUM);
    events = xzmalloc(sizeof(*events) * MAX_EVENT_NUM);

    sock = socket(PF_INET, SOCK_DGRAM, 0);
    if (sock < 0)
        error_and_die(EXIT_FAILURE, "Cannot create socket!\n");

    /* Non-blocking */
    opts = fcntl(sock, F_GETFL);
    if(opts < 0)
        error_and_die(EXIT_FAILURE, "Cannot fetch sock opts!\n");
    opts |= O_NONBLOCK;
    rc = fcntl(sock, F_SETFL, opts);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot set sock opts!\n");

    /* Initial epoll setup */
    epfd = epoll_create(MAX_EVENT_NUM);
    if(epfd < 0)
        error_and_die(EXIT_FAILURE, "Error fetching an epoll descriptor!\n");

    memset(&ep, 0, sizeof(ep));
    ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    ep.data.fd = sock;

    rc = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ep);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");

    /* Socket binding */
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = inet_addr(local_addr);
    sin.sin_port = htons(port_xy);

    rc = bind(sock, (struct sockaddr *) &sin, sizeof(sin));
    if (rc < 0)
        error_and_die(EXIT_FAILURE, "Problem binding to port! "
                      "Already in use?\n");

    register_signal(SIGINT, &signal_handler);

    /* Thread initialization */
    for (i = 0, cpu = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_create(&threads[i], NULL, process_clients, events);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        CPU_ZERO(&cpuset);
        CPU_SET(cpu, &cpuset);

        rc = pthread_setaffinity_np(threads[i], sizeof(cpuset), &cpuset);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        cpu = (cpu + 1) % NR_CPUS_ON;
    }

    printf("up and running!\n");

    /* Thread joining */
    for (i = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_join(threads[i], &status);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Error on thread exit!\n");
    }

    close(sock);
    xfree(threads);
    xfree(events);

    printf("shut down!\n");

    return 0;
}

这是使用 epoll 处理这种情况的正确方法吗?函数 _handle_request_ 是否应该尽快返回,因为此时套接字的事件队列被阻塞?!

感谢您的回复!

I'd like to develop a multithreaded UDP server in C/Linux. The service is running on a single port x, thus there's only the possibility to bind a single UDP socket to it. In order to work under high loads, I have n threads (statically defined), say 1 thread per CPU. Work could be delivered to the thread using epoll_wait, so threads get woken up on demand with 'EPOLLET | EPOLLONESHOT'. I've attached a code example:

static int epfd;
static sig_atomic_t sigint = 0;

...

/* Thread routine with epoll_wait */
static void *process_clients(void *pevents)
{
    int rc, i, sock, nfds;
    struct epoll_event ep, *events = (struct epoll_event *) pevents;

    while (!sigint) {
        nfds = epoll_wait(epfd, events, MAX_EVENT_NUM, 500);

        for (i = 0; i < nfds; ++i) {
           if (events[i].data.fd < 0)
                continue;

           sock = events[i].data.fd;

           if((events[i].events & EPOLLIN) == EPOLLIN) {
               printf("Event dispatch!\n");
               handle_request(sock); // do a recvfrom
           } else
               whine("Unknown poll event!\n");

           memset(&ep, 0, sizeof(ep));
           ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
           ep.data.fd = sock;

           rc = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ep);
           if(rc < 0)
               error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");
       }
    }

    pthread_exit(NULL);
}

int main(int argc, char **argv)
{
    int rc, i, cpu, sock, opts;
    struct sockaddr_in sin;
    struct epoll_event ep, *events;
    char *local_addr = "192.168.1.108";
    void *status;
    pthread_t *threads = NULL;
    cpu_set_t cpuset;

    threads = xzmalloc(sizeof(*threads) * MAX_THRD_NUM);
    events = xzmalloc(sizeof(*events) * MAX_EVENT_NUM);

    sock = socket(PF_INET, SOCK_DGRAM, 0);
    if (sock < 0)
        error_and_die(EXIT_FAILURE, "Cannot create socket!\n");

    /* Non-blocking */
    opts = fcntl(sock, F_GETFL);
    if(opts < 0)
        error_and_die(EXIT_FAILURE, "Cannot fetch sock opts!\n");
    opts |= O_NONBLOCK;
    rc = fcntl(sock, F_SETFL, opts);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot set sock opts!\n");

    /* Initial epoll setup */
    epfd = epoll_create(MAX_EVENT_NUM);
    if(epfd < 0)
        error_and_die(EXIT_FAILURE, "Error fetching an epoll descriptor!\n");

    memset(&ep, 0, sizeof(ep));
    ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    ep.data.fd = sock;

    rc = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ep);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");

    /* Socket binding */
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = inet_addr(local_addr);
    sin.sin_port = htons(port_xy);

    rc = bind(sock, (struct sockaddr *) &sin, sizeof(sin));
    if (rc < 0)
        error_and_die(EXIT_FAILURE, "Problem binding to port! "
                      "Already in use?\n");

    register_signal(SIGINT, &signal_handler);

    /* Thread initialization */
    for (i = 0, cpu = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_create(&threads[i], NULL, process_clients, events);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        CPU_ZERO(&cpuset);
        CPU_SET(cpu, &cpuset);

        rc = pthread_setaffinity_np(threads[i], sizeof(cpuset), &cpuset);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        cpu = (cpu + 1) % NR_CPUS_ON;
    }

    printf("up and running!\n");

    /* Thread joining */
    for (i = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_join(threads[i], &status);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Error on thread exit!\n");
    }

    close(sock);
    xfree(threads);
    xfree(events);

    printf("shut down!\n");

    return 0;
}

Is this the proper way of handling this scenario with epoll? Should the function _handle_request_ return as fast as possible, because for this time the eventqueue for the socket is blocked?!

Thanks for replies!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

衣神在巴黎 2024-10-05 03:00:17

由于您只使用单个 UDP 套接字,因此使用 epoll 是没有意义的 - 只需使用阻塞式 recvfrom 即可。

现在,根据您需要处理的协议 - 如果您可以单独处理每个 UDP 数据包 - 您实际上可以从多个线程(在线程池中)同时调用 recvfrom。操作系统将确保只有一个线程会接收 UDP 数据包。然后该线程可以在handle_request 中执行它需要执行的任何操作。

但是,如果您需要按特定顺序处理 UDP 数据包,您可能没有那么多机会并行化您的程序...

As you are only using a single UDP socket, there is no point using epoll - just use a blocking recvfrom instead.

Now, depending on the protocol you need to handle - if you can process each UDP packet individually - you can actually call recvfrom concurrently from multiple threads (in a thread pool). The OS will take care that exactly one thread will receive the UDP packet. This thread can then do whatever it needs to do in handle_request.

However, if you need to process the UDP packets in a particular order, you'll probably not have that many opportunities to parallalise your program...

夜无邪 2024-10-05 03:00:17

不,这不会按您想要的方式工作。要让工作线程处理通过 epoll 接口到达的事件,您需要不同的体系结构。

示例设计(有多种方法可以做到这一点)
使用:SysV/POSIX 信号量。

  • 让主线程生成 n 个子线程和一个信号量,然后阻止 epolling 套接字(或其他)。

  • 让每个子线程阻止关闭信号量。

  • 当主线程解除阻塞时,它将事件存储在某个全局结构中,并为每个事件增加一次信号量。

  • 子线程解除阻塞,处理事件,当信号量返回到 0 时再次阻塞。

您可以使用在所有线程之间共享的管道实现与信号量非常相似的功能。这将使您阻止 select() 而不是信号量,您可以使用信号量在其他事件(超时、其他管道等)上唤醒线程。

您也可以反转此控制,并在工作线程需要任务时唤醒主线程。不过,我认为上述方法更适合您的情况。

No, this will not work the way you want to. To have worker threads process events arriving through an epoll interface, you need a different architecture.

Example design (there are several ways to do this)
Uses: SysV/POSIX semaphores.

  • Have the master thread spawn n subthreads and a semaphore, then block epolling your sockets (or whatever).

  • Have each subthread block on down-ing the semaphore.

  • When the master thread unblocks, it stores the events in some global structure and ups the semaphore once per event.

  • The subthreads unblock, process the events, block again when the semaphore returns to 0.

You can use a pipe shared among all threads to achieve very similar functionality to that of the semaphore. This would let you block on select() instead of the semaphore, which you can use to wake the threads up on some other event (timeouts, other pipes, etc.)

You can also reverse this control, and have the master thread wake up when its workers demand tasks. I think the above approach is better for your case, though.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文