Boost asio UDP客户端未接收数据

发布于 2025-01-09 18:42:03 字数 2781 浏览 0 评论 0原文

我正在尝试使用 UDP 将数据从服务器发送到特定客户端,而不让客户端先将数据发送到服务器,根本不发送服务器数据。问题是客户端使用 receive_from() 函数,什么也没得到。由于某种原因,服务器确实设法发送数据,并且在发送整个有效负载后,它会自行关闭,但我不知道它将数据发送到哪里。 如果我在没有客户端的情况下运行服务器,数据也成功发送,我不知道为什么,服务器不应该阻止 send_to() 函数直到数据发送吗?

这是客户端停止的地方(我的代码的一部分):

void UDPclient::run()
{
    std::string relative_path = "assets/";
    std::thread thread_context = std::thread([&] {_io_context.run(); }); //start the context.
    file_info fileInfo;

    boost::asio::io_context::work idleWork(_io_context);
    boost::system::error_code error;

    udp::socket socket(_io_context); //the file descriptor 
    
    WirehairCodec decoder;

    udp::endpoint sender;
    memset(&fileInfo, sizeof(fileInfo), 0); //reset the struct to 0 (good practice)

    std::size_t bytes_transferred = socket.receive_from(
        boost::asio::buffer(&fileInfo, sizeof(fileInfo)),
        sender);

    ...

    socket.close();
}

服务器(我的代码的一部分):

int main()
{
    std::uint16_t port = 2000;
    file_info fileInfo;
    std::string filePath;
    boost::asio::io_context io_context;

    udp::socket socket(io_context, udp::endpoint(udp::v4(), port));
    udp::endpoint destination(boost::asio::ip::address::from_string("127.0.0.1"), port);
    
    boost::system::error_code ec;
    const WirehairResult initResult = wirehair_init(); //initialize wirehair

    if(initResult != Wirehair_Success)
    {
        std::cout << "failed to initialize wirehair: " << initResult << std::endl;
        return -1;
    }

    if (ec)
        std::cerr << ec.message() << std::endl;
    else
    { 
        std::cout << "Enter the specified file (Full path) to send: ";
        std::cin >> filePath; 

        while (!boost::filesystem::exists(filePath))
        {
            std::cerr << "file doesn't exist." << std::endl;
            std::cout << "Enter the specified file (Full path) to send: ";
            std::cin >> filePath;
        }

        read_fileToVector(filePath, vBuffer);
        file_info fileInfo;
        memset(&fileInfo, 0, sizeof(fileInfo)); // Always set the struct values to 0, good practice.

        //send file size, name
        fileInfo.size = vBuffer.size();
        strncpy(fileInfo.fileName, filePath.substr(filePath.find_last_of("/\\") + 1).c_str(), 
                  sizeof(fileInfo.fileName) - 1);
        std::cout << "name: " << fileInfo.fileName << std::endl;
        std::cout << "size: " << fileInfo.size << std::endl;

        socket.send_to(boost::asio::buffer(&fileInfo, sizeof(fileInfo)), destination);
        socket.wait(socket.wait_write);

        ...
    socket.close();
}

I'm trying to send data from the server to a specific client using UDP, without making the client send data to the server first, without sending the server data at all. The problem is that the client waits for data using the
receive_from() function, and gets nothing. For some reason the server does manage to send data and after sending the whole payload, it closes itself, but I don't know to where it sends the data.
If I run the server without the client the data is also successfully sent, and I don't know why, isn't the server supposed to block the send_to() function until the data is sent?

this is where the client stops (part of my code):

void UDPclient::run()
{
    std::string relative_path = "assets/";
    std::thread thread_context = std::thread([&] {_io_context.run(); }); //start the context.
    file_info fileInfo;

    boost::asio::io_context::work idleWork(_io_context);
    boost::system::error_code error;

    udp::socket socket(_io_context); //the file descriptor 
    
    WirehairCodec decoder;

    udp::endpoint sender;
    memset(&fileInfo, sizeof(fileInfo), 0); //reset the struct to 0 (good practice)

    std::size_t bytes_transferred = socket.receive_from(
        boost::asio::buffer(&fileInfo, sizeof(fileInfo)),
        sender);

    ...

    socket.close();
}

Server(part of my code):

int main()
{
    std::uint16_t port = 2000;
    file_info fileInfo;
    std::string filePath;
    boost::asio::io_context io_context;

    udp::socket socket(io_context, udp::endpoint(udp::v4(), port));
    udp::endpoint destination(boost::asio::ip::address::from_string("127.0.0.1"), port);
    
    boost::system::error_code ec;
    const WirehairResult initResult = wirehair_init(); //initialize wirehair

    if(initResult != Wirehair_Success)
    {
        std::cout << "failed to initialize wirehair: " << initResult << std::endl;
        return -1;
    }

    if (ec)
        std::cerr << ec.message() << std::endl;
    else
    { 
        std::cout << "Enter the specified file (Full path) to send: ";
        std::cin >> filePath; 

        while (!boost::filesystem::exists(filePath))
        {
            std::cerr << "file doesn't exist." << std::endl;
            std::cout << "Enter the specified file (Full path) to send: ";
            std::cin >> filePath;
        }

        read_fileToVector(filePath, vBuffer);
        file_info fileInfo;
        memset(&fileInfo, 0, sizeof(fileInfo)); // Always set the struct values to 0, good practice.

        //send file size, name
        fileInfo.size = vBuffer.size();
        strncpy(fileInfo.fileName, filePath.substr(filePath.find_last_of("/\\") + 1).c_str(), 
                  sizeof(fileInfo.fileName) - 1);
        std::cout << "name: " << fileInfo.fileName << std::endl;
        std::cout << "size: " << fileInfo.size << std::endl;

        socket.send_to(boost::asio::buffer(&fileInfo, sizeof(fileInfo)), destination);
        socket.wait(socket.wait_write);

        ...
    socket.close();
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

一身骄傲 2025-01-16 18:42:03

确实没有足够的代码可以告诉,但我可以指出一些气味:

  • memset 调用是错误的。由于显然 file_info 必须是简单且标准的布局(对于 memset 来说是合法的),为什么不使用空初始化器对其进行聚合初始化,以达到相同的效果(默认初始化每个成员)?

  • 您的上下文显然是一个类成员,但您正在为每个接收启动一个线程?这看起来很奇怪。我希望有一个具有 io_context/work 生命周期的线程。

    如果您确实想重用/重新启动相同的 io_context,请记住需要在其间调用 .reset()

  • 您不需要为每个接收打开/关闭套接字

  • 您可能希望将 UDP 套接字绑定到特定端口以进行接收

我期望看到的内容类似于:

struct UDPclient {
    UDPclient(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
        : socket_(ex, udp::v4())
        , port_(port)
    {
        socket_.bind({{}, port_});
    }

    bool run()
    {
        file_info fi{}; // value-initializes all members

        udp::endpoint sender;
        if (std::size_t n =
                socket_.receive_from(asio::buffer(&fi, sizeof(fi)), sender);
            n > file_info::HEADERLEN && fi.magic == file_info::MAGIC) //
        {
            // don't assume name will be zero terminated
            std::string_view name(fi.name.data(),
                                  strnlen(fi.name.data(), fi.name.max_size()));

            COUT << "Receiving " << fi.xfer_id << " length " << fi.file_length
                 << " name " << std::quoted(name) << " from " << sender
                 << std::endl;
            decoder_.reset(
                wirehair_decoder_create(nullptr, fi.file_length, PACKET_SIZE));

            if (!decoder_) {
                throw std::runtime_error("wirehair_decoder_create");
            }

            packet_info packet{};

            for (bool data_complete = false; !data_complete;) {
                if (auto len = socket_.receive_from(
                        asio::buffer(&packet, sizeof(packet)), sender);
                    n >= packet_info::HEADERLEN &&
                    packet.magic == packet_info::MAGIC &&
                    len == packet_info::HEADERLEN + packet.block_length) //
                {
                    if (fi.xfer_id != packet.xfer_id)
                        continue; // TODO concurrent receives

                    COUT << "(Incoming " << packet.block_length << " for "
                         << fi.xfer_id << " from " << sender << ")"
                         << std::endl;
                    // Attempt decode
                    switch (wirehair_decode(decoder_.get(), packet.id,
                                            packet.block.data(),
                                            packet.block_length)) //
                    {
                        case Wirehair_NeedMore: continue; break;
                        case Wirehair_Success: data_complete = true; break;
                        default: throw std::runtime_error("wirehair_decode");
                    }
                    COUT << "(data complete? " << std::boolalpha
                         << data_complete << ")" << std::endl;
                }
            }
            COUT << "Receive completed for " << fi.xfer_id << std::endl;

            // try to be safe about interpreting the output name
            auto spec = fs::relative(
                weakly_canonical(
                    relative_path /
                    fs::path(name).lexically_normal().relative_path()),
                relative_path);

            if (spec.empty())
                throw std::runtime_error("invalid file specification " + spec.native());

            auto target = relative_path / spec;
            fs::create_directories(target.parent_path());

            COUT << "Decoding to " << target << " for " << fi.xfer_id
                 << std::endl;
            std::vector<uint8_t> decoded(fi.file_length);

            // Recover original data on decoder side
            auto r = wirehair_recover(decoder_.get(), decoded.data(),
                    decoded.size());

            if (r != Wirehair_Success)
                throw std::runtime_error("wirehair_recover");

            std::ofstream(target, std::ios::binary)
                .write(reinterpret_cast<char const*>(decoded.data()),
                       decoded.size());
        }
        return true;
    }

  private:
    udp::socket socket_;
    uint16_t    port_;
    CodecPtr    decoder_{};
    fs::path    relative_path = "assets/";
};

为此,我编写了一个由两种类型的消息组成的协议,其神奇标头定义为:

namespace /*protocol*/ {
#pragma pack(push, 1)
    struct file_info {
        boost::endian::big_uint32_t magic, file_length;
        boost::uuids::uuid             xfer_id;
        std::array<char, PATH_MAX + 1> name;

        enum : unsigned {
            MAGIC     = 0xDEFACED,
            HEADERLEN = sizeof(magic) + sizeof(file_length) + sizeof(xfer_id),
        };
    };
    static_assert(std::is_trivial_v<file_info>);
    static_assert(sizeof(file_info) + 8 <= 0xFFFF); // must fit udp

    struct packet_info {
        boost::endian::big_uint32_t      magic, block_length, id;
        boost::uuids::uuid               xfer_id;
        std::array<uint8_t, PACKET_SIZE> block;
        enum : unsigned {
            MAGIC     = static_cast<unsigned>(~0xDEFACED),
            HEADERLEN = sizeof(magic) + sizeof(block_length) + sizeof(id) + sizeof(xfer_id)
        };
    };
    static_assert(std::is_trivial_v<packet_info>);
    static_assert(sizeof(packet_info) + 8 <= 0xFFFF); // must fit udp
#pragma pack(pop)

    // Rule Of Zero, please:
    struct WHFree {
        void operator()(WirehairCodec c) const { wirehair_free(c); }
    };

    using CodecPtr = std::unique_ptr<WirehairCodec_t, WHFree>;
} // namespace

另请注意异常安全句柄类型 CodecPtr 可以避免泄漏这些资源的任何风险。

使用 Wirehair 的完整演示

出于兴趣,我查看了 Wirehair 编解码器并使用它实现了一个简单的同步文件传输。

  • 一个限制是无法传输零长度文件(在这种情况下 wirehair_encoder_create 会失败)。您必须创建一个例外来涵盖此类情况。
  • 此外,也没有正常关闭(这是因为不可能取消阻塞套接字操作),
  • 使接收端异步将立即解锁同时接收多个传输的可能性。我已经将 xfer_id 字段作为相关 ID 放入协议消息中。
#include <cassert>
#include <chrono>
#include <filesystem>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <thread>
using namespace std::chrono_literals;
using std::this_thread::sleep_for;

#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

#include <wirehair/wirehair.h>

namespace asio = boost::asio;
namespace fs   = std::filesystem;
using asio::ip::udp;

static constexpr int      PACKET_SIZE  = 1400;
static constexpr uint16_t DEFAULT_PORT = 9797;

namespace /*protocol*/ {
#pragma pack(push, 1)
    struct file_info {
        boost::endian::big_uint32_t magic, file_length;
        boost::uuids::uuid             xfer_id;
        std::array<char, PATH_MAX + 1> name;

        enum : unsigned {
            MAGIC     = 0xDEFACED,
            HEADERLEN = sizeof(magic) + sizeof(file_length) + sizeof(xfer_id),
        };
    };
    static_assert(std::is_trivial_v<file_info>);
    static_assert(sizeof(file_info) + 8 <= 0xFFFF); // must fit udp

    struct packet_info {
        boost::endian::big_uint32_t      magic, block_length, id;
        boost::uuids::uuid               xfer_id;
        std::array<uint8_t, PACKET_SIZE> block;
        enum : unsigned {
            MAGIC     = static_cast<unsigned>(~0xDEFACED),
            HEADERLEN = sizeof(magic) + sizeof(block_length) + sizeof(id) + sizeof(xfer_id)
        };
    };
    static_assert(std::is_trivial_v<packet_info>);
    static_assert(sizeof(packet_info) + 8 <= 0xFFFF); // must fit udp
#pragma pack(pop)

    // Rule Of Zero, please:
    struct WHFree {
        void operator()(WirehairCodec c) const { wirehair_free(c); }
    };

    using CodecPtr = std::unique_ptr<WirehairCodec_t, WHFree>;
} // namespace

struct UDPclient {
    UDPclient(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
        : socket_(ex, udp::v4())
        , port_(port)
    {
        socket_.bind({{}, port_});
    }

    bool run()
    {
        file_info fi{}; // value-initializes all members

        udp::endpoint sender;
        if (std::size_t n =
                socket_.receive_from(asio::buffer(&fi, sizeof(fi)), sender);
            n > file_info::HEADERLEN && fi.magic == file_info::MAGIC) //
        {
            // don't assume name will be zero terminated
            std::string_view name(fi.name.data(),
                                  strnlen(fi.name.data(), fi.name.max_size()));

            std::cout << "Receiving " << fi.xfer_id << " length "
                      << fi.file_length << " name " << std::quoted(name)
                      << " from " << sender << std::endl;
            decoder_.reset(
                wirehair_decoder_create(nullptr, fi.file_length, PACKET_SIZE));

            if (!decoder_) {
                throw std::runtime_error("wirehair_decoder_create");
            }

            packet_info packet{};

            for (bool data_complete = false; !data_complete;) {
                if (auto len = socket_.receive_from(
                        asio::buffer(&packet, sizeof(packet)), sender);
                    n >= packet_info::HEADERLEN &&
                    packet.magic == packet_info::MAGIC &&
                    len == packet_info::HEADERLEN + packet.block_length) //
                {
                    if (fi.xfer_id != packet.xfer_id)
                        continue; // TODO concurrent receives

                    std::cout << "(Incoming " << packet.block_length << " for "
                              << fi.xfer_id << " from " << sender << ")"
                              << std::endl;
                    // Attempt decode
                    switch (wirehair_decode(decoder_.get(), packet.id,
                                            packet.block.data(),
                                            packet.block_length)) //
                    {
                        case Wirehair_NeedMore: continue; break;
                        case Wirehair_Success: data_complete = true; break;
                        default: throw std::runtime_error("wirehair_decode");
                    }
                    std::cout << "(data complete? " << std::boolalpha
                              << data_complete << ")" << std::endl;
                }
            }
            std::cout << "Receive completed for " << fi.xfer_id << std::endl;

            // try to be safe about interpreting the output name
            auto spec = fs::relative(
                weakly_canonical(
                    relative_path /
                    fs::path(name).lexically_normal().relative_path()),
                relative_path);

            if (spec.empty())
                throw std::runtime_error("invalid file specification " + spec.native());

            auto target = relative_path / spec;
            fs::create_directories(target.parent_path());

            std::cout << "Decoding to " << target << " for " << fi.xfer_id
                      << std::endl;
            std::vector<uint8_t> decoded(fi.file_length);

            // Recover original data on decoder side
            auto r = wirehair_recover(decoder_.get(), decoded.data(),
                    decoded.size());

            if (r != Wirehair_Success)
                throw std::runtime_error("wirehair_recover");

            std::ofstream(target, std::ios::binary)
                .write(reinterpret_cast<char const*>(decoded.data()),
                       decoded.size());
        }
        return true;
    }

  private:
    udp::socket socket_;
    uint16_t    port_;
    CodecPtr    decoder_{};
    fs::path    relative_path = "assets/";
};

struct Sender {
    Sender(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
        : socket_(ex, udp::v4())
        , port_(port)
    {
    }

    bool send(fs::path filespec)
    {
        std::ifstream ifs(filespec, std::ios::binary);
        std::vector<uint8_t> const contents(std::istreambuf_iterator<char>(ifs),
                                            {});
        ifs.close();
        assert(contents.size() == fs::file_size(filespec));

        file_info fi{}; // value-initializes all members
        fi.magic       = file_info::MAGIC;
        fi.xfer_id     = boost::uuids::random_generator{}();
        fi.file_length = contents.size();
        strncpy(fi.name.data(), filespec.c_str(), fi.name.size() - 1);

        socket_.send_to(asio::buffer(&fi, sizeof(fi)), {{}, port_});

        // Create encoder
        encoder_.reset(wirehair_encoder_create(nullptr, contents.data(),
                                               contents.size(), PACKET_SIZE));
        if (!encoder_) {
            throw std::runtime_error("wirehair_encoder_create");
        }

        auto N = contents.size() / PACKET_SIZE + 1;
        N      = (N * 10) / 9; // ~10% redundancy

        std::cout << "Sending " << filespec << " of " << contents.size()
                  << " bytes in " << N << " packets of " << PACKET_SIZE
                  << std::endl;

        for (unsigned block_id = 1; block_id <= N; ++block_id) {
            sleep_for(500ms);
            packet_info packet{};
            packet.magic   = packet_info::MAGIC;
            packet.xfer_id = fi.xfer_id;

            // Encode a packet
            uint32_t writeLen = 0;
            if (auto r = wirehair_encode(encoder_.get(), block_id,
                                         packet.block.data(),
                                         packet.block.size(), &writeLen);
                r == Wirehair_Success) //
            {
                packet.id           = block_id;
                packet.block_length = writeLen;
                socket_.send_to(
                    asio::buffer(&packet, packet_info::HEADERLEN + writeLen),
                    {{}, port_});
                std::cout << "(Packet " << packet.block_length << " bytes)"
                          << std::endl;
            } else {
                throw std::runtime_error("wirehair_encode");
            }
        }

        std::cout << "Send " << filespec << " complete (xfer_id:" << fi.xfer_id
                  << ")" << std::endl;
        return true;
    }

  private:
    udp::socket socket_;
    uint16_t    port_;
    CodecPtr    encoder_{};
    fs::path    relative_path = "assets/";
};

int main(int argc, char** argv) {
    if (auto r = wirehair_init(); r != Wirehair_Success) {
        std::cout << "Wirehair initialization failed: " << r << std::endl;
        return 1;
    }

    asio::thread_pool io(1); 
    auto ex = io.get_executor();

    post(io, [ex] {
        UDPclient client{ex};
        while (true)
        try { client.run(); }
        catch (std::exception const& e) { std::cout << "Exception: " << e.what() << "\n"; }
    });

    Sender sender{ex};
    for (auto spec : std::vector(argv + 1, argv + argc)) {
        try {
            sender.send(spec);
        } catch (std::exception const& e) {
            std::cout << "Exception: " << e.what() << "\n";
        }
    }

    io.join();
}

我无法将演示放在网上,但您可以使用 https://github 的存储库自行构建它。 com/sehe/wirehair-demo

在此处输入图像描述

There's not really enough code to tell, but there are a few smells that I can point out:

  • the memset call is wrong. Since clearly file_info must be trivial and standard layout (for memset to be legal), why not aggregate-initialize it with empty initializer instead, with the same effect (default-initializing every member)?

  • your context is apparently a class member, but you are starting a thread for each receive? That seems odd. I'd expect a single thread with the lifetime of the io_context/work.

    If you really want to reuse/restart the same io_context, keep in mind the need to call .reset() on it in between.

  • You don't need to open/close a socket for each receive either

  • You might want to bind your UDP socket to a specific port to receive on

What'd I'd expect to see is something similar to:

struct UDPclient {
    UDPclient(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
        : socket_(ex, udp::v4())
        , port_(port)
    {
        socket_.bind({{}, port_});
    }

    bool run()
    {
        file_info fi{}; // value-initializes all members

        udp::endpoint sender;
        if (std::size_t n =
                socket_.receive_from(asio::buffer(&fi, sizeof(fi)), sender);
            n > file_info::HEADERLEN && fi.magic == file_info::MAGIC) //
        {
            // don't assume name will be zero terminated
            std::string_view name(fi.name.data(),
                                  strnlen(fi.name.data(), fi.name.max_size()));

            COUT << "Receiving " << fi.xfer_id << " length " << fi.file_length
                 << " name " << std::quoted(name) << " from " << sender
                 << std::endl;
            decoder_.reset(
                wirehair_decoder_create(nullptr, fi.file_length, PACKET_SIZE));

            if (!decoder_) {
                throw std::runtime_error("wirehair_decoder_create");
            }

            packet_info packet{};

            for (bool data_complete = false; !data_complete;) {
                if (auto len = socket_.receive_from(
                        asio::buffer(&packet, sizeof(packet)), sender);
                    n >= packet_info::HEADERLEN &&
                    packet.magic == packet_info::MAGIC &&
                    len == packet_info::HEADERLEN + packet.block_length) //
                {
                    if (fi.xfer_id != packet.xfer_id)
                        continue; // TODO concurrent receives

                    COUT << "(Incoming " << packet.block_length << " for "
                         << fi.xfer_id << " from " << sender << ")"
                         << std::endl;
                    // Attempt decode
                    switch (wirehair_decode(decoder_.get(), packet.id,
                                            packet.block.data(),
                                            packet.block_length)) //
                    {
                        case Wirehair_NeedMore: continue; break;
                        case Wirehair_Success: data_complete = true; break;
                        default: throw std::runtime_error("wirehair_decode");
                    }
                    COUT << "(data complete? " << std::boolalpha
                         << data_complete << ")" << std::endl;
                }
            }
            COUT << "Receive completed for " << fi.xfer_id << std::endl;

            // try to be safe about interpreting the output name
            auto spec = fs::relative(
                weakly_canonical(
                    relative_path /
                    fs::path(name).lexically_normal().relative_path()),
                relative_path);

            if (spec.empty())
                throw std::runtime_error("invalid file specification " + spec.native());

            auto target = relative_path / spec;
            fs::create_directories(target.parent_path());

            COUT << "Decoding to " << target << " for " << fi.xfer_id
                 << std::endl;
            std::vector<uint8_t> decoded(fi.file_length);

            // Recover original data on decoder side
            auto r = wirehair_recover(decoder_.get(), decoded.data(),
                    decoded.size());

            if (r != Wirehair_Success)
                throw std::runtime_error("wirehair_recover");

            std::ofstream(target, std::ios::binary)
                .write(reinterpret_cast<char const*>(decoded.data()),
                       decoded.size());
        }
        return true;
    }

  private:
    udp::socket socket_;
    uint16_t    port_;
    CodecPtr    decoder_{};
    fs::path    relative_path = "assets/";
};

For this I made up a protocol consisting of two types of messages with magic headers defined as:

namespace /*protocol*/ {
#pragma pack(push, 1)
    struct file_info {
        boost::endian::big_uint32_t magic, file_length;
        boost::uuids::uuid             xfer_id;
        std::array<char, PATH_MAX + 1> name;

        enum : unsigned {
            MAGIC     = 0xDEFACED,
            HEADERLEN = sizeof(magic) + sizeof(file_length) + sizeof(xfer_id),
        };
    };
    static_assert(std::is_trivial_v<file_info>);
    static_assert(sizeof(file_info) + 8 <= 0xFFFF); // must fit udp

    struct packet_info {
        boost::endian::big_uint32_t      magic, block_length, id;
        boost::uuids::uuid               xfer_id;
        std::array<uint8_t, PACKET_SIZE> block;
        enum : unsigned {
            MAGIC     = static_cast<unsigned>(~0xDEFACED),
            HEADERLEN = sizeof(magic) + sizeof(block_length) + sizeof(id) + sizeof(xfer_id)
        };
    };
    static_assert(std::is_trivial_v<packet_info>);
    static_assert(sizeof(packet_info) + 8 <= 0xFFFF); // must fit udp
#pragma pack(pop)

    // Rule Of Zero, please:
    struct WHFree {
        void operator()(WirehairCodec c) const { wirehair_free(c); }
    };

    using CodecPtr = std::unique_ptr<WirehairCodec_t, WHFree>;
} // namespace

Note also the exception-safe handle type CodecPtr that avoids any risk of leaking these resources.

Full Demo Using Wirehair

Out of interest, I looked at the Wirehair codec and implemented a simple synchronous file transfer using it.

  • One limitation is that zero-length files cannot be transferred (wirehair_encoder_create fails in that case). You'd have to create an exception to cover such cases.
  • Also there is no graceful shutdown (that's because it's not possible to cancel blocking socket operations)
  • Making the receiving end async would immediately unlock the possibility to receive several transfers simultaneiously. I already put a xfer_id field in to the protocol messages as a correlation id.
#include <cassert>
#include <chrono>
#include <filesystem>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <thread>
using namespace std::chrono_literals;
using std::this_thread::sleep_for;

#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

#include <wirehair/wirehair.h>

namespace asio = boost::asio;
namespace fs   = std::filesystem;
using asio::ip::udp;

static constexpr int      PACKET_SIZE  = 1400;
static constexpr uint16_t DEFAULT_PORT = 9797;

namespace /*protocol*/ {
#pragma pack(push, 1)
    struct file_info {
        boost::endian::big_uint32_t magic, file_length;
        boost::uuids::uuid             xfer_id;
        std::array<char, PATH_MAX + 1> name;

        enum : unsigned {
            MAGIC     = 0xDEFACED,
            HEADERLEN = sizeof(magic) + sizeof(file_length) + sizeof(xfer_id),
        };
    };
    static_assert(std::is_trivial_v<file_info>);
    static_assert(sizeof(file_info) + 8 <= 0xFFFF); // must fit udp

    struct packet_info {
        boost::endian::big_uint32_t      magic, block_length, id;
        boost::uuids::uuid               xfer_id;
        std::array<uint8_t, PACKET_SIZE> block;
        enum : unsigned {
            MAGIC     = static_cast<unsigned>(~0xDEFACED),
            HEADERLEN = sizeof(magic) + sizeof(block_length) + sizeof(id) + sizeof(xfer_id)
        };
    };
    static_assert(std::is_trivial_v<packet_info>);
    static_assert(sizeof(packet_info) + 8 <= 0xFFFF); // must fit udp
#pragma pack(pop)

    // Rule Of Zero, please:
    struct WHFree {
        void operator()(WirehairCodec c) const { wirehair_free(c); }
    };

    using CodecPtr = std::unique_ptr<WirehairCodec_t, WHFree>;
} // namespace

struct UDPclient {
    UDPclient(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
        : socket_(ex, udp::v4())
        , port_(port)
    {
        socket_.bind({{}, port_});
    }

    bool run()
    {
        file_info fi{}; // value-initializes all members

        udp::endpoint sender;
        if (std::size_t n =
                socket_.receive_from(asio::buffer(&fi, sizeof(fi)), sender);
            n > file_info::HEADERLEN && fi.magic == file_info::MAGIC) //
        {
            // don't assume name will be zero terminated
            std::string_view name(fi.name.data(),
                                  strnlen(fi.name.data(), fi.name.max_size()));

            std::cout << "Receiving " << fi.xfer_id << " length "
                      << fi.file_length << " name " << std::quoted(name)
                      << " from " << sender << std::endl;
            decoder_.reset(
                wirehair_decoder_create(nullptr, fi.file_length, PACKET_SIZE));

            if (!decoder_) {
                throw std::runtime_error("wirehair_decoder_create");
            }

            packet_info packet{};

            for (bool data_complete = false; !data_complete;) {
                if (auto len = socket_.receive_from(
                        asio::buffer(&packet, sizeof(packet)), sender);
                    n >= packet_info::HEADERLEN &&
                    packet.magic == packet_info::MAGIC &&
                    len == packet_info::HEADERLEN + packet.block_length) //
                {
                    if (fi.xfer_id != packet.xfer_id)
                        continue; // TODO concurrent receives

                    std::cout << "(Incoming " << packet.block_length << " for "
                              << fi.xfer_id << " from " << sender << ")"
                              << std::endl;
                    // Attempt decode
                    switch (wirehair_decode(decoder_.get(), packet.id,
                                            packet.block.data(),
                                            packet.block_length)) //
                    {
                        case Wirehair_NeedMore: continue; break;
                        case Wirehair_Success: data_complete = true; break;
                        default: throw std::runtime_error("wirehair_decode");
                    }
                    std::cout << "(data complete? " << std::boolalpha
                              << data_complete << ")" << std::endl;
                }
            }
            std::cout << "Receive completed for " << fi.xfer_id << std::endl;

            // try to be safe about interpreting the output name
            auto spec = fs::relative(
                weakly_canonical(
                    relative_path /
                    fs::path(name).lexically_normal().relative_path()),
                relative_path);

            if (spec.empty())
                throw std::runtime_error("invalid file specification " + spec.native());

            auto target = relative_path / spec;
            fs::create_directories(target.parent_path());

            std::cout << "Decoding to " << target << " for " << fi.xfer_id
                      << std::endl;
            std::vector<uint8_t> decoded(fi.file_length);

            // Recover original data on decoder side
            auto r = wirehair_recover(decoder_.get(), decoded.data(),
                    decoded.size());

            if (r != Wirehair_Success)
                throw std::runtime_error("wirehair_recover");

            std::ofstream(target, std::ios::binary)
                .write(reinterpret_cast<char const*>(decoded.data()),
                       decoded.size());
        }
        return true;
    }

  private:
    udp::socket socket_;
    uint16_t    port_;
    CodecPtr    decoder_{};
    fs::path    relative_path = "assets/";
};

struct Sender {
    Sender(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
        : socket_(ex, udp::v4())
        , port_(port)
    {
    }

    bool send(fs::path filespec)
    {
        std::ifstream ifs(filespec, std::ios::binary);
        std::vector<uint8_t> const contents(std::istreambuf_iterator<char>(ifs),
                                            {});
        ifs.close();
        assert(contents.size() == fs::file_size(filespec));

        file_info fi{}; // value-initializes all members
        fi.magic       = file_info::MAGIC;
        fi.xfer_id     = boost::uuids::random_generator{}();
        fi.file_length = contents.size();
        strncpy(fi.name.data(), filespec.c_str(), fi.name.size() - 1);

        socket_.send_to(asio::buffer(&fi, sizeof(fi)), {{}, port_});

        // Create encoder
        encoder_.reset(wirehair_encoder_create(nullptr, contents.data(),
                                               contents.size(), PACKET_SIZE));
        if (!encoder_) {
            throw std::runtime_error("wirehair_encoder_create");
        }

        auto N = contents.size() / PACKET_SIZE + 1;
        N      = (N * 10) / 9; // ~10% redundancy

        std::cout << "Sending " << filespec << " of " << contents.size()
                  << " bytes in " << N << " packets of " << PACKET_SIZE
                  << std::endl;

        for (unsigned block_id = 1; block_id <= N; ++block_id) {
            sleep_for(500ms);
            packet_info packet{};
            packet.magic   = packet_info::MAGIC;
            packet.xfer_id = fi.xfer_id;

            // Encode a packet
            uint32_t writeLen = 0;
            if (auto r = wirehair_encode(encoder_.get(), block_id,
                                         packet.block.data(),
                                         packet.block.size(), &writeLen);
                r == Wirehair_Success) //
            {
                packet.id           = block_id;
                packet.block_length = writeLen;
                socket_.send_to(
                    asio::buffer(&packet, packet_info::HEADERLEN + writeLen),
                    {{}, port_});
                std::cout << "(Packet " << packet.block_length << " bytes)"
                          << std::endl;
            } else {
                throw std::runtime_error("wirehair_encode");
            }
        }

        std::cout << "Send " << filespec << " complete (xfer_id:" << fi.xfer_id
                  << ")" << std::endl;
        return true;
    }

  private:
    udp::socket socket_;
    uint16_t    port_;
    CodecPtr    encoder_{};
    fs::path    relative_path = "assets/";
};

int main(int argc, char** argv) {
    if (auto r = wirehair_init(); r != Wirehair_Success) {
        std::cout << "Wirehair initialization failed: " << r << std::endl;
        return 1;
    }

    asio::thread_pool io(1); 
    auto ex = io.get_executor();

    post(io, [ex] {
        UDPclient client{ex};
        while (true)
        try { client.run(); }
        catch (std::exception const& e) { std::cout << "Exception: " << e.what() << "\n"; }
    });

    Sender sender{ex};
    for (auto spec : std::vector(argv + 1, argv + argc)) {
        try {
            sender.send(spec);
        } catch (std::exception const& e) {
            std::cout << "Exception: " << e.what() << "\n";
        }
    }

    io.join();
}

I cannot put the demo live online, but you can build it yourself with the repository at https://github.com/sehe/wirehair-demo:

enter image description here

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文