当新消息到达时,如何使UDP套接字替换旧消息(尚未recv()'d)?

发布于 2024-09-14 01:36:38 字数 1048 浏览 8 评论 0原文

首先,用一些上下文来解释为什么我采用“UDP 采样”路线:
我想对未知时间段内快速生成的数据进行采样。我想要采样的数据位于另一台机器上,而不是消耗数据的机器上。我在两者之间有专用的以太网连接,因此带宽不是问题。我遇到的问题是消耗数据的机器比生成数据的机器慢得多。另一个限制是,虽然我没有获得所有样本(它们只是样本)也没关系,但我必须获得最后一个样本。

我的第一个解决方案是让数据生产者为每个生成的样本发送一个 UDP 数据报,并让数据消费者尝试获取它可以获取的样本,并在 UDP 套接字已满时让其他样本被套接字层丢弃。此解决方案的问题在于,当新的 UDP 数据报到达且套接字已满时,将丢弃数据报,而不是旧数据报。所以我不保证能得到最后一张!

我的问题是:有没有办法让 UDP 套接字在新数据报到达时替换旧数据报?

接收器目前是一台 Linux 机器,但将来可能会改用另一种类 UNIX 操作系统(Windows 可能是可能的,因为它实现了 BSD 套接字,但可能性较小)
理想的解决方案是使用广泛的机制(如setsockopt())来工作。

PS:我想到了其他解决方案,但它们更复杂(涉及对发送者的大量修改),因此我想首先对我所要求的可行性有一个明确的状态! :)

更新: - 我知道接收机器上的操作系统可以处理网络负载+发送者生成的流量的重组。只是它的默认行为是当套接字缓冲区已满时丢弃新的数据报。由于接收过程中的处理时间,我知道无论我做什么它都会变满(在套接字缓冲区上浪费一半内存不是一个选项:))。
- 我真的很想避免让辅助进程执行操作系统在数据包分发时可以执行的操作,并浪费资源只是在 SHM 中复制消息。
- 我在修改发件人时看到的问题是,我有权访问的代码只是一个 PleaseSendThisData() 函数,它不知道这可能是很长时间之前最后一次调用它,所以我不知道在那一端看到任何可行的技巧......但我愿意接受建议! :)

如果真的没有办法改变 BSD 套接字中的 UDP 接收行为,那么好吧......告诉我,我准备接受这个可怕的事实,并且当我去时将开始研究“辅助进程”解决方案回到正题:)

First, a little bit of context to explain why I am on the "UDP sampling" route:
I would like to sample data produced at a fast rate for an unknown period of time. The data I want to sample is on another machine than the one consuming the data. I have a dedicated Ethernet connection between the two so bandwidth is not an issue. The problem I have is that the machine consuming the data is much slower than the one producing it. An added constraint is that while it's ok that I don't get all the samples (they are just samples), it is mandatory that I get the last one.

My 1st solution was to make the data producer send a UDP datagram for each produced sample and let the data consumer try to get the samples it could and let the others be discarded by the socket layer when the UDP socket is full. The problem with this solution is that when new UDP datagrams arrive and the socket is full, it is the new datagrams that get discarded and not the old ones. Therefore I am not guarantueed to have the last one!

My question is: is there a way to make a UDP socket replace old datagrams when new arrive?

The receiver is currently a Linux machine, but that could change in favor of another unix-like OS in the future (windows may be possible as it implements BSD sockets, but less likely)
The ideal solution would use widespread mecanisms (like setsockopt()s) to work.

PS: I thought of other solutions but they are more complex (involve heavy modification of the sender), therefore I would like first to have a definite status on the feasability of what I ask! :)

Updates:
- I know that the OS on the receiving machine can handle the network load + reassembly of the traffic generated by the sender. It's just that its default behaviour is to discard new datagrams when the socket buffer is full. And because of the processing times in the receiving process, I know it will become full whatever I do (wasting half of the memory on a socket buffer is not an option :)).
- I really would like to avoid having an helper process doing what the OS could have done at packet-dispatching time and waste resource just copying messages in a SHM.
- The problem I see with modifying the sender is that the code which I have access to is just a PleaseSendThisData() function, it has no knowledge that it can be the last time it is called before a long time, so I don't see any doable tricks at that end... but I'm open to suggestions! :)

If there are really no way to change the UDP receiving behaviour in a BSD socket, then well... just tell me, I am prepared to accept this terrible truth and will start working on the "helper process" solution when I go back to it :)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

断舍离 2024-09-21 01:36:38

只需将套接字设置为非阻塞,然后循环 recv() 直到它返回 0 与 errno == EAGAIN。然后处理你得到的最后一包,冲洗并重复。

Just set the socket to non-blocking, and loop on recv() until it returns < 0 with errno == EAGAIN. Then process the last packet you got, rinse and repeat.

握住我的手 2024-09-21 01:36:38

我同意“咖啡馆”。
将套接字设置为非阻塞模式。

每当您在套接字上收到某些内容时 - 循环读取直到没有任何内容为止。然后处理最后读取的数据报。

只有一点需要注意:您应该为套接字设置一个大的系统接收缓冲区

int nRcvBufSize = 5*1024*1024; // or whatever you think is ok
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*) &nRcvBufSize, sizeof(nRcvBufSize));

I agree with "caf".
Set the socket to a non-blocking mode.

Whenever you receive something on the socket - read in a loop until nothing more is left. Then handle the last read datagram.

Only one note: you should set a large system receive buffer for the socket

int nRcvBufSize = 5*1024*1024; // or whatever you think is ok
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*) &nRcvBufSize, sizeof(nRcvBufSize));
等待圉鍢 2024-09-21 01:36:38

仅在侦听器端很难完全正确,因为它实际上可能会错过网络接口芯片中的最后一个数据包,这将使您的程序没有机会看到它。

操作系统的 UDP 代码将是尝试处理此问题的最佳位置,因为即使它决定丢弃新数据包,它也会收到新数据包,因为它已经有太多排队的数据包。然后它可以决定丢弃旧的或丢弃新的,但我不知道如何告诉它这就是你想要它做的事情。

您可以尝试在接收器上处理此问题,方法是让一个程序或线程始终尝试读取最新的数据包,而另一个程序或线程始终尝试获取最新的数据包。根据您是作为两个单独的程序还是作为两个线程执行此操作,如何执行此操作会有所不同。

作为线程,您需要一个互斥锁(信号量或类似的东西)来保护指向用于保存 1 UDP 有效负载的结构的指针(或引用)以及您想要的任何其他内容(大小、发送者 IP、发送者端口、时间戳等) )。

实际从套接字读取数据包的线程会将数据包的数据存储在结构中,获取保护该指针的互斥体,将当前指针替换为指向刚刚创建的结构体的指针,释放互斥体,向处理器线程发出信号有事情要做,然后清除它刚刚获得指针的结构,并使用它来保存进入的下一个数据包。

实际处理数据包有效负载的线程应该等待来自其他线程的信号和/或定期等待(500 毫秒左右可能是一个很好的起点,但由您决定)并获取互斥锁,将其指向 UDP 负载结构的指针与现有的结构交换,释放互斥锁,然后如果该结构有任何数据包它应该处理它的数据,然后等待下一个信号。如果它没有任何数据,它应该继续等待下一个信号。

处理器线程可能应该以低于 UDP 侦听器的优先级运行,以便侦听器不太可能错过数据包。当处理最后一个数据包(您真正关心的数据包)时,处理器不会被中断,因为没有新的数据包可供侦听器听到。

您可以通过使用队列而不仅仅是单个指针作为两个线程的交换位置来扩展此功能。单个指针只是一个长度为1的队列,非常容易处理。

您还可以通过尝试让侦听器线程检测是否有多个数据包在等待来扩展此功能,并且仅将最后一个数据包实际放入处理器线程的队列中。执行此操作的方式因平台而异,但如果您使用 *nix,则对于没有任何等待的套接字,这应该返回 0:

while (keep_doing_this()) {
    ssize_t len = read(udp_socket_fd, my_udp_packet->buf, my_udp_packet->buf_len); 
    // this could have been recv or recvfrom
    if (len < 0) {
        error();
    }
    int sz;
    int rc = ioctl(udp_socket_fd, FIONREAD, &sz);
    if (rc < 0) {
        error();
    }
    if (!sz) {
        // There aren't any more packets ready, so queue up the one we got
        my_udp_packet->current_len = len;

        my_udp_packet = swap_udp_packet(my_ucp_packet);
        /* swap_udp_packet is code you would have to write to implement what I talked
           about above. */

        tgkill(this_group, procesor_thread_tid, SIGUSR1);
    } else if (sz > my_udp_packet->buf_len) {
        /* You could resize the buffer for the packet payload here if it is too small.*/
    }
}

必须为每个线程分配 udp_packet,并为交换指针分配 1。如果您使用队列进行交换,那么您必须为队列中的每个位置提供足够的 udp_packets ——因为指针只是一个长度为 1 的队列,因此它只需要 1。

如果您使用的是 POSIX 系统,则考虑不使用实时数据包信号用于信令,因为它们排队。使用常规信号将允许您将多次收到信号视为与仅收到一次信号相同,直到信号被处理,而实时信号会排队。定期醒来检查队列还可以让您处理在检查是否有任何新数据包之后、调用 pause 等待信号之前最后一个信号到达的可能性。

This will be difficult to get completely right just on the listener side since it could actually miss the last packet in the Network Interface Chip, which will keep your program from ever having had a chance at seeing it.

The operating system's UDP code would be the best place to try to deal with this since it will get new packets even if it decides to discard them because it already has too many queued up. Then it could make the decision of dropping an old one or dropping a new one, but I don't know how to go about telling it that this is what you would want it to do.

You can try to deal with this on the receiver by having one program or thread that always tries to read in the newest packet and another that always tries to get that newest packet. How to do this would differ based on if you did it as two separate programs or as two threads.

As threads you would need a mutex (semaphore or something like it) to protect a pointer (or reference) to a structure used to hold 1 UDP payload and whatever else you wanted in there (size, sender IP, sender port, timestamp, etc).

The thread that actually read packets from the socket would store the packet's data in a struct, acquire the mutex protecting that pointer, swap out the current pointer for a pointer to the struct it just made, release the mutex, signal the processor thread that it has something to do, and then clear out the structure that it just got a pointer to and use it to hold the next packet that comes in.

The thread that actually processed packet payloads should wait on the signal from the other thread and/or periodically (500 ms or so is probably a good starting point for this, but you decide) and aquire the mutex, swap its pointer to a UDP payload structure with the one that is there, release the mutex, and then if the structure has any packet data it should process it and then wait on the next signal. If it did not have any data it should just go ahead and wait on the next signal.

The processor thread should probably run at a lower priority than the UDP listener so that the listener is less likely to ever miss a packet. When processing the last packet (the one you really care about) the processor will not be interrupted because there are no new packets for the listener to hear.

You could extend this by using a queue rather than just a single pointer as the swapping place for the two threads. The single pointer is just a queue of length 1 and is very easy to process.

You could also extend this by attempting to have the listener thread detect if there are multiple packets waiting and only actually putting the last of those into the queue for the processor thread. How you do this will differ by platform, but if you are using a *nix then this should return 0 for sockets with nothing waiting:

while (keep_doing_this()) {
    ssize_t len = read(udp_socket_fd, my_udp_packet->buf, my_udp_packet->buf_len); 
    // this could have been recv or recvfrom
    if (len < 0) {
        error();
    }
    int sz;
    int rc = ioctl(udp_socket_fd, FIONREAD, &sz);
    if (rc < 0) {
        error();
    }
    if (!sz) {
        // There aren't any more packets ready, so queue up the one we got
        my_udp_packet->current_len = len;

        my_udp_packet = swap_udp_packet(my_ucp_packet);
        /* swap_udp_packet is code you would have to write to implement what I talked
           about above. */

        tgkill(this_group, procesor_thread_tid, SIGUSR1);
    } else if (sz > my_udp_packet->buf_len) {
        /* You could resize the buffer for the packet payload here if it is too small.*/
    }
}

A udp_packet would have to be allocated for each thread as well as 1 for the swapping pointer. If you use a queue for swapping then you must have enough udp_packets for each position in the queue -- since the pointer is just a queue of length 1 it only needs 1.

If you are using a POSIX system then consider not using a real time signal for the signaling because they queue up. Using a regular signal will allow you to treat being signaled many times the same as being signaled just once until the signal is handled, while real time signals queue up. Waking up periodically to check the queue also allows you to handle the possibility of the last signal arriving just after you have checked to see if you had any new packets but before you call pause to wait on a signal.

空名 2024-09-21 01:36:38

另一个想法是有一个专用的读取器进程,它除了在套接字上循环并将传入数据包读取到共享内存中的循环缓冲区之外什么都不做(您必须担心正确的写入顺序)。类似 kfifo 的东西。非阻塞在这里也很好。新数据覆盖旧数据。然后其他进程将始终可以访问队列头部的最新块以及所有尚未覆盖的先前块。

对于一个简单的单向读者来说可能太复杂了,只是一种选择。

Another idea is to have a dedicated reader process that does nothing but loops on the socket and reads incoming packets into circular buffer in shared memory (you'll have to worry about proper write ordering). Something like kfifo. Non-blocking is fine here too. New data overrides old data. Then other process(es) would always have access to latest block at the head of the queue and all the previous chunks not yet overwritten.

Might be too complicated for a simple one-way reader, just an option.

流云如水 2024-09-21 01:36:38

我很确定这是一个无法解决的问题,与两军问题密切相关

我可以想到一个肮脏的解决方案:建立一个 TCP“控制”边带连接,该连接携带最后一个数据包,这也是“结束传输”指示。否则,您需要使用工程方法中提到的更通用的实用方法之一。

I'm pretty sure that this is a provably insoluble problem closely related to the Two Army Problem.

I can think of a dirty solution: establish a TCP "control" sideband connection which carries the last packet which is also a "end transmission" indication. Otherwise you need to use one of the more general pragmatic means noted in Engineering Approaches.

悍妇囚夫 2024-09-21 01:36:38

这是一个老问题,但您基本上想将套接字队列(先进先出)转换为堆栈(后进先出)。这是不可能的,除非你想摆弄内核。

您需要将数据报从内核空间移动到用户空间,然后进行处理。最简单的方法是像这样的循环...

  1. 阻塞,直到套接字上有数据(参见 select、poll、epoll)
  2. 排空套接字,根据您自己的选择策略存储数据报
  3. 处理存储的数据报
  4. 重复

This is an old question, but you are basically wanting to turn the socket queue (FIFO) into a stack (LIFO). It's not possible, unless you want to fiddle with the kernel.

You'll need to move the datagrams from kernel space to user space and then process. Easiest approach would be a loop like this...

  1. Block until there is data on the socket (see select, poll, epoll)
  2. Drain the socket, storing datagrams per your own selection policy
  3. Process the stored datagrams
  4. Repeat
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文