尝试从单独的线程多次调用 Write() 会导致崩溃 [gRPC] [C++]

发布于 2025-01-19 09:10:16 字数 1509 浏览 1 评论 0原文

我正在尝试编写一个异步流 gRPC 服务器(遵循 this 示例)在 C++ 中,其中对 Write 的多次调用在单独的线程上执行。不幸的是,这会在我的系统上导致 SIGSEGV。服务器能够在崩溃之前执行一次写入。下面的代码提供了我正在尝试执行的操作的简单示例。重载的调用运算符从单独的线程接收消息并执行 Write() 调用,将 MyMessage 写入流。

void MyServer::HandleRpcs() {
    new CallData(&m_service, m_queue.get());
    void* tag;
    bool ok;
    while (true) {
        GPR_ASSERT(m_queue->Next(&tag, &ok));
        GPR_ASSERT(ok);
        static_cast<CallData*>(tag)->Proceed();
    }
}

void MyServer::CallData::Proceed() {
    if (m_state == CREATE) {
        m_state = PROCESS;
        m_service->RequestRpc(&m_context, &m_request, &m_responder, m_queue, m_queue, this);
    }
    else if (m_state == PROCESS) {
        new CallData(m_service, m_queue);
        // Request the RPC here, which begins the message calls to the overloaded () operator
    }
    else {
        GPR_ASSERT(m_state == FINISH);
        delete this;
    }
}

void MyServer::CallData::operator()(Message message) {
    std::lock_guard<std::recursive_mutex> lock{m_serverMutex};
    MyStream stream;
    stream.set_message(message.payload);
    m_responder.Write(stream, this);
    PushTaskToQueue();
}

void MyServer::CallData::PushTaskToQueue() {
    // m_alarm is a member of CallData
    m_alarm.Set(m_queue, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), this);
}

I'm attempting to write an async streaming gRPC server (following this example) in C++ where multiple calls to Write are performed on a separate thread. Unfortunately, this causes a SIGSEGV on my system. The server is able to perform one write before it crashes. The below code provides a simple example of what I'm attempting to do. The overloaded call operator receives a message from a separate thread and executes the Write() call, writing MyMessage to the stream.

void MyServer::HandleRpcs() {
    new CallData(&m_service, m_queue.get());
    void* tag;
    bool ok;
    while (true) {
        GPR_ASSERT(m_queue->Next(&tag, &ok));
        GPR_ASSERT(ok);
        static_cast<CallData*>(tag)->Proceed();
    }
}

void MyServer::CallData::Proceed() {
    if (m_state == CREATE) {
        m_state = PROCESS;
        m_service->RequestRpc(&m_context, &m_request, &m_responder, m_queue, m_queue, this);
    }
    else if (m_state == PROCESS) {
        new CallData(m_service, m_queue);
        // Request the RPC here, which begins the message calls to the overloaded () operator
    }
    else {
        GPR_ASSERT(m_state == FINISH);
        delete this;
    }
}

void MyServer::CallData::operator()(Message message) {
    std::lock_guard<std::recursive_mutex> lock{m_serverMutex};
    MyStream stream;
    stream.set_message(message.payload);
    m_responder.Write(stream, this);
    PushTaskToQueue();
}

void MyServer::CallData::PushTaskToQueue() {
    // m_alarm is a member of CallData
    m_alarm.Set(m_queue, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), this);
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦里兽 2025-01-26 09:10:16

原来我对 gRPC 和完成队列有误解。我在完成队列返回标记之前调用 Write(),这导致了崩溃。为了解决这个问题,我在 MyServer 中创建了一个名为 m_tagstatic void* 成员变量,并将其传递到 Next函数的 tag 参数,如下所示:

GPR_ASSERT(m_queue->Next(&m_tag, &ok));

然后,我检查标签是否与处理程序的重载调用运算符中的 this 指针:

if (m_tag != this) return;

然后我看到我的消息流通过了。

Turns out I had a misunderstanding of gRPC and the completion queue. I was calling Write() before the completion queue returns the tag, which caused the crash. To resolve this, I created a static void* member variable in MyServer called m_tag and passed it into the Next function's tag parameter, like so:

GPR_ASSERT(m_queue->Next(&m_tag, &ok));

Then, I checked if the tag matches up with the handler's this pointer in the overloaded call operator:

if (m_tag != this) return;

And I then saw my message stream come through.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文