检索GRPC-GO中拦截器中的响应元数据

发布于 2025-01-21 07:26:10 字数 2632 浏览 2 评论 0原文

简而言之:
如何使用grpc.withStreamInterceptor检索响应的初始_metadata,而无需包装grpc.ClientStream

情况是我尝试使用拦截器来管理所有调用的请求和响应中的元数据。例如,附加密码/令牌以从响应中请求和检索令牌。

首先,我创建一个结构并定义了流互感面

type ClientInterceptor struct {
    Header         string
    Value          string
    ResponseHeader string
    ResponseValue  string
}
func (client *ClientInterceptor) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    log.Println("Add header in interceptor")
    ctx = metadata.AppendToOutgoingContext(ctx, client.Header, client.Value)
    clientStream, err := streamer(ctx, desc, cc, method, opts...)
    if err != nil {
        return nil, err
    }
    log.Println("Access response header")
    responseMeta, err := s.Header() // Block at here, server not trigger any function
    if err != nil {
        return nil, err
    }
    if val, ok := responseMeta[client.ResponseHeader]; ok {
        client.ResponseValue = val[0]
        log.Println("Response header: ", val[0])
    } else {
        return nil, status.Errorf(codes.DataLoss, "Response without header founded")
    }
    return clientStream, nil
}

,在下面是我的使用方式。

interceptor := &ClientInterceptor{Header: "x-custom", Value: "hello header", ResponseHeader: "x-custom-echo"}
conn, err := grpc.Dial("127.0.0.1:60661", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStreamInterceptor(interceptor.streamInterceptor))

但是,该代码始终是explsentemeta,err:= s.header()的代码,并且服务器不显示任何请求。
如果我删除代码的一部分并使用struct包装clienttream,我可以从recvmsg()获得响应元数据,

type wrappedStream struct {
    grpc.ClientStream
    getHeader bool
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
    if !w.getHeader {
        responseMeta, err := w.Header()
        if err != nil {
            return nil
        }
        if val, ok := responseMeta["x-custom-echo"]; ok {
            log.Println("Response header: ", val[0])
        }
    }
    return w.ClientStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
    return w.ClientStream.SendMsg(m)
}

这意味着每个调用都会创建其自己的实例和靠自己存放令牌,这不是我想要的。
我知道我可以使用指针共享值,但似乎有点“脏”。
根据 grpc官方文档先发送没有身体(Protobuf)。
我如何强制将元数据发送到Interceptor中,因为当请求标头到达时,服务器将首先响应元数据(请求主体不必要)。

要重现这种情况,我将其写在

For short:
How can I retrieve response's initial_metadata with grpc.WithStreamInterceptor without wrapping grpc.ClientStream?

The situation is I try to use an interceptor to manage metadata in request and response from all calls. For example, append password/token to request and retrieve token from response.

First I create a struct and define a streamInterceptor

type ClientInterceptor struct {
    Header         string
    Value          string
    ResponseHeader string
    ResponseValue  string
}
func (client *ClientInterceptor) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    log.Println("Add header in interceptor")
    ctx = metadata.AppendToOutgoingContext(ctx, client.Header, client.Value)
    clientStream, err := streamer(ctx, desc, cc, method, opts...)
    if err != nil {
        return nil, err
    }
    log.Println("Access response header")
    responseMeta, err := s.Header() // Block at here, server not trigger any function
    if err != nil {
        return nil, err
    }
    if val, ok := responseMeta[client.ResponseHeader]; ok {
        client.ResponseValue = val[0]
        log.Println("Response header: ", val[0])
    } else {
        return nil, status.Errorf(codes.DataLoss, "Response without header founded")
    }
    return clientStream, nil
}

and below is how I use it.

interceptor := &ClientInterceptor{Header: "x-custom", Value: "hello header", ResponseHeader: "x-custom-echo"}
conn, err := grpc.Dial("127.0.0.1:60661", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStreamInterceptor(interceptor.streamInterceptor))

However, the code is always block on the line responseMeta, err := s.Header(), and the server shows no request.
If I remove the part of the code and use a struct to wrap clientStream, I can get response metadata from RecvMsg()

type wrappedStream struct {
    grpc.ClientStream
    getHeader bool
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
    if !w.getHeader {
        responseMeta, err := w.Header()
        if err != nil {
            return nil
        }
        if val, ok := responseMeta["x-custom-echo"]; ok {
            log.Println("Response header: ", val[0])
        }
    }
    return w.ClientStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
    return w.ClientStream.SendMsg(m)
}

This means that every call will create it's own instance and store the token by their own, which is not what I want.
I know I can use a pointer to share the values, but it seems a bit "dirty".
According to the gRPC official document, initial_metadata can be send first without body(protobuf).
How can I forced metadata to be send in interceptor, since server will response metadata first when request header comes (request body is unnecessary).

To reproduce the situation, I write it in gist
if you need.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文