关于 http 请求 grpc-gateway stream 流式响应时无法decode 返回值

发布于 2022-09-12 23:39:39 字数 3581 浏览 41 评论 0

http 请求 stream 响应时,response body 打印出来是正确的结果:

        {"result":{"code":1,"msg":"1111"}}
        {"result":{"code":2,"msg":"2222"}}
        {"result":{"code":3,"msg":"3333"}}
        {"result":{"code":4,"msg":"4444"}}
        {"result":{"code":5,"msg":"5555"}}
        {"result":{"code":6,"msg":"6666"}}

但是使用 runtime.JSONPb.Decode 时,也会得到五个结果,但每个 decode 出来是个 nil... :

=== RUN   TestHttpRespStream
    service_test.go:147: resp: <nil>
    service_test.go:147: resp: <nil>
    service_test.go:147: resp: <nil>
    service_test.go:147: resp: <nil>
    service_test.go:147: resp: <nil>
    service_test.go:147: resp: <nil>
    service_test.go:149: EOF
--- PASS: TestHttpRespStream (0.62s)

这是 proto 文件:

// ./pb/test.proto

syntax = "proto3";

package pb;

option go_package = "/pb;pb";

import "google/api/annotations.proto";

message Req {
    int32 id = 1;
    string name = 2;
}

message Resp {
    int32 code = 1;
    string msg = 2;
}

service TestService {
    rpc QueryStreamResp(Req) returns (stream Resp){
        option (google.api.http) = {
            post: "/query-stream-resp"
            body: "*"
        };
    };
    rpc QueryStreamReq(stream Req) returns (Resp){
        option (google.api.http) = {
            post: "/query-stream-req"
            body: "*"
        };
    };
    
    rpc Query(stream Req) returns (stream Resp){
        option (google.api.http) = {
            post: "/query"
            body: "*"
        };
    };
}

grpc 服务端:

func (ts *TestService) QueryStreamResp(req *pb.Req, stream pb.TestService_QueryStreamRespServer) error {
    log.Printf("QueryStreamResp|start...|req: %+v\n", req)
    result := []*pb.Resp{
        {Code: 1, Msg: "1111"},
        {Code: 2, Msg: "2222"},
        {Code: 3, Msg: "3333"},
        {Code: 4, Msg: "4444"},
        {Code: 5, Msg: "5555"},
        {Code: 6, Msg: "6666"},
    }

    // header := make(metadata.MD)
    // header.Append("content-type", "application/json")
    // stream.SendHeader(header)
    for i := range result {
        log.Printf("resp: %+v", result[i])
        
        if err := stream.Send(result[i]); err != nil {
            log.Fatal(err)
        }
        time.Sleep(100 * time.Millisecond)
    }
    log.Println("QueryStreamResp|stop...")

    return nil
}

单元测试:

func TestHttpRespStream(t *testing.T) {
    url := "http://127.0.0.1:8080/query-stream-resp"
    reqData := &pb.Req{Id: 1, Name: "111"}

    var buffer bytes.Buffer
    encoder := (&runtime.JSONPb{}).NewEncoder(&buffer)

    if err := encoder.Encode(reqData); err != nil {
        t.Fatal(err)
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, &buffer)
    if err != nil {
        t.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        t.Fatal(err)
    }
    defer resp.Body.Close()
    // body, err := ioutil.ReadAll(resp.Body)
    // if err != nil {
    //     t.Fatal(err)
    // }
    // t.Logf("body: %s", string(body))
    jsonb := new(runtime.JSONPb)
    dencoder := jsonb.NewDecoder(resp.Body)

    for {
        var result *pb.Resp
        err := dencoder.Decode(result)
        if err == nil {
            t.Logf("resp: %+v", result)
        } else {
            t.Logf("%+v", err)
            break
        }
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

嗫嚅 2022-09-19 23:39:39

已解决:
猜测原因是

  1. github.com/grpc-ecosystem/grpc-gateway 版本低(v1.14)最新应为:github.com/grpc-ecosystem/grpc-gateway/v2 v2.4.0
  2. 并且 grpc-gateway 返回值中多了一个字段:result,所以要在 proto 文件中新设置一个 message:

    message Resp {
     int32 code = 1;
     string msg = 2;
    }
    
    // 新增
    message HttpResp {
     Resp result = 1;
    }
  3. 在 decode 时:

    var result *pb.HttpResp // 注意 不要使用 *pb.Resp
    err := dencoder.Decode(&result) // &result  

单元测试:

package main

import (
    "bytes"
    "net/http"
    "test/pb"
    "testing"

    "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
    "google.golang.org/grpc"
)

func TestHttpRespStream(t *testing.T) {
    url := "http://127.0.0.1:8080/query-stream-resp"
    reqData := &pb.Req{Id: 1, Name: "111"}

    var buffer bytes.Buffer
    encoder := (&runtime.JSONPb{}).NewEncoder(&buffer)

    if err := encoder.Encode(reqData); err != nil {
        t.Fatal(err)
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, &buffer)
    if err != nil {
        t.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        t.Fatal(err)
    }
    defer resp.Body.Close()

    jsonb := new(runtime.JSONPb)
    dencoder := jsonb.NewDecoder(resp.Body)

    for {
        var result *pb.HttpResp // 注意 不要使用 *pb.Resp
        err := dencoder.Decode(&result) // &result  
        if err == nil {
            t.Logf("resp: %+v", result)
        } else {
            t.Logf("%+v", err)
            break
        }
    }
}
/* 输出:
=== RUN   TestHttpRespStream
    service_test.go:147: resp: result:{code:1 msg:"1111"}
    service_test.go:147: resp: result:{code:2 msg:"2222"}
    service_test.go:147: resp: result:{code:3 msg:"3333"}
    service_test.go:147: resp: result:{code:4 msg:"4444"}
    service_test.go:147: resp: result:{code:5 msg:"5555"}
    service_test.go:147: resp: result:{code:6 msg:"6666"}
    service_test.go:149: EOF
--- PASS: TestHttpRespStream (0.63s)
PASS
ok      test    1.089s
*/
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文