python grpc.channel._Rendevezous 异常状态 Unknwon

发布于 2025-01-10 20:02:26 字数 6894 浏览 0 评论 0原文

我建立了 grpc 连接,并尝试在通道中发出请求,但是当我从客户端调用服务时,出现以下异常。有人知道吗?线程的使用是其原因吗?我不明白它出了什么问题。

这是带有服务的 protobuf 架构:

service P4Runtime {
  // Update one or more P4 entities on the target.
  rpc Write(WriteRequest) returns (WriteResponse) {
  }
  // Read one or more P4 entities from the target.
  rpc Read(ReadRequest) returns (stream ReadResponse) {
  }

  // Sets the P4 forwarding-pipeline config.
  rpc SetForwardingPipelineConfig(SetForwardingPipelineConfigRequest)
      returns (SetForwardingPipelineConfigResponse) {
  }
  // Gets the current P4 forwarding-pipeline config.
  rpc GetForwardingPipelineConfig(GetForwardingPipelineConfigRequest)
      returns (GetForwardingPipelineConfigResponse) {
  }

  // Represents the bidirectional stream between the controller and the
  // switch (initiated by the controller), and is managed for the following
  // purposes:
  // - connection initiation through client arbitration
  // - indicating switch session liveness: the session is live when switch
  //   sends a positive client arbitration update to the controller, and is
  //   considered dead when either the stream breaks or the switch sends a
  //   negative update for client arbitration
  // - the controller sending/receiving packets to/from the switch
  // - streaming of notifications from the switch
  rpc StreamChannel(stream StreamMessageRequest)
      returns (stream StreamMessageResponse) {
  }

  rpc Capabilities(CapabilitiesRequest) returns (CapabilitiesResponse) {
  }
}

下面是我调用的函数,并且发生了异常:

def write_IPv4_Rules(p4info_helper,ingress_sw,ipv4_dst,lpm,dst_mac,out_port):
    table_entry = p4info_helper.buildTableEntry(
        table_name="MyIngress.ipv4_lpm",
        match_fields={
            "hdr.ipv4.dstAddr": (ipv4_dst, lpm)
        },
        action_name="MyIngress.ipv4_forward",
        action_params={
            "dstAddr": dst_mac,
            "port": out_port
        })
    ingress_sw.WriteTableEntry(table_entry)
    print("Installed ipv4 rule on %s" % ingress_sw.name)

这是对位于 thead 内的上述函数的调用:

write_IPv4_Rules(p4info_helper,ingress_sw,ip_dest,32,dst_mac,2)

下面我有使用 grpc 服务的控制器的代码:

class SwitchConnection(object):

  def __init__(self, name=None, address='127.0.0.1:50051', device_id=0,
             proto_dump_file=None):
    self.name = name
    self.address = address
    self.device_id = device_id
    self.p4info = None
    self.channel = grpc.insecure_channel(self.address)
    if proto_dump_file is not None:
        interceptor = GrpcRequestLogger(proto_dump_file)
        self.channel = grpc.intercept_channel(self.channel, interceptor)
    self.client_stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel)
    self.requests_stream = IterableQueue()
    self.stream_msg_resp = self.client_stub.StreamChannel(iter(self.requests_stream))
    self.proto_dump_file = proto_dump_file
    connections.append(self)

def WriteTableEntry(self, table_entry, dry_run=False):
    request = p4runtime_pb2.WriteRequest()
    request.device_id = self.device_id
    request.election_id.low = 1
    update = request.updates.add()
    if table_entry.is_default_action:
        update.type = p4runtime_pb2.Update.MODIFY
    else:
        update.type = p4runtime_pb2.Update.INSERT
    update.entity.table_entry.CopyFrom(table_entry)
    if dry_run:
        print("P4Runtime Write:", request)
    else:
        self.client_stub.Write(request)


class GrpcRequestLogger(grpc.UnaryUnaryClientInterceptor,
                        grpc.UnaryStreamClientInterceptor):
    """Implementation of a gRPC interceptor that logs request to a file"""

def __init__(self, log_file):
    self.log_file = log_file
    with open(self.log_file, 'w') as f:
        # Clear content if it exists.
        f.write("")

def log_message(self, method_name, body):
    with open(self.log_file, 'a') as f:
        ts = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
        msg = str(body)
        f.write("\n[%s] %s\n---\n" % (ts, method_name))
        if len(msg) < MSG_LOG_MAX_LEN:
            f.write(str(body))
        else:
            f.write("Message too long (%d bytes)! Skipping log...\n" % len(msg))
        f.write('---\n')

def intercept_unary_unary(self, continuation, client_call_details, request):
    self.log_message(client_call_details.method, request)
    return continuation(client_call_details, request)

def intercept_unary_stream(self, continuation, client_call_details, request):
    self.log_message(client_call_details.method, request)
    return continuation(client_call_details, request)

class IterableQueue(Queue):
    _sentinel = object()

    def __iter__(self):
        return iter(self.get, self._sentinel)

    def close(self):
        self.put(self._sentinel)

异常当我运行程序时我收到:

Exception in thread Thread-11:
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "./mycontroller.py", line 348, in packet_router_processing
    ipv4_forwarding(p4info_helper,extract_header,ingress_sw,packetIn.packet.metadata)
  File "./mycontroller.py", line 256, in ipv4_forwarding
    write_IPv4_Rules(p4info_helper,ingress_sw,ip_dest,24,dst_mac,1)
  File "./mycontroller.py", line 38, in write_IPv4_Rules
    ingress_sw.WriteTableEntry(table_entry)
  File "/home/p4/tutorials/exercises/test/../../utils/p4runtime_lib/switch.py", line 102, in WriteTableEntry
    self.client_stub.Write(request)
  File "/usr/local/lib/python3.8/dist-packages/grpc/_interceptor.py", line 207, in __call__
    response, ignored_call = self._with_call(
  File "/usr/local/lib/python3.8/dist-packages/grpc/_interceptor.py", line 240, in _with_call
    call = self._interceptor.intercept_unary_unary(
  File "/home/p4/tutorials/exercises/test/../../utils/p4runtime_lib/switch.py", line 220, in intercept_unary_unary
    return continuation(client_call_details, request)
  File "/usr/local/lib/python3.8/dist-packages/grpc/_interceptor.py", line 228, in continuation
    response, call = self._thunk(new_method).with_call(
  File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 557, in with_call
    return _end_unary_response_blocking(state, call, True, None)
  File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 466, in _end_unary_response_blocking
    raise _Rendezvous(state, None, None, deadline)
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = ""
        debug_error_string = "{"created":"@1646087190.862135612","description":"Error received from peer","file":"src/core/lib/surface/call.cc","file_line":1036,"grpc_message":"","grpc_status":2}"

I have a grpc connection established and i try to make a request in the channel but when i call the service from the client i get the following exception. Does anyone know something about it? Does the usage of threads is the reason for it? I can't figure out what is wrong with it.

This is the protobuf schema with the service:

service P4Runtime {
  // Update one or more P4 entities on the target.
  rpc Write(WriteRequest) returns (WriteResponse) {
  }
  // Read one or more P4 entities from the target.
  rpc Read(ReadRequest) returns (stream ReadResponse) {
  }

  // Sets the P4 forwarding-pipeline config.
  rpc SetForwardingPipelineConfig(SetForwardingPipelineConfigRequest)
      returns (SetForwardingPipelineConfigResponse) {
  }
  // Gets the current P4 forwarding-pipeline config.
  rpc GetForwardingPipelineConfig(GetForwardingPipelineConfigRequest)
      returns (GetForwardingPipelineConfigResponse) {
  }

  // Represents the bidirectional stream between the controller and the
  // switch (initiated by the controller), and is managed for the following
  // purposes:
  // - connection initiation through client arbitration
  // - indicating switch session liveness: the session is live when switch
  //   sends a positive client arbitration update to the controller, and is
  //   considered dead when either the stream breaks or the switch sends a
  //   negative update for client arbitration
  // - the controller sending/receiving packets to/from the switch
  // - streaming of notifications from the switch
  rpc StreamChannel(stream StreamMessageRequest)
      returns (stream StreamMessageResponse) {
  }

  rpc Capabilities(CapabilitiesRequest) returns (CapabilitiesResponse) {
  }
}

Below is the function that i call and exception happens:

def write_IPv4_Rules(p4info_helper,ingress_sw,ipv4_dst,lpm,dst_mac,out_port):
    table_entry = p4info_helper.buildTableEntry(
        table_name="MyIngress.ipv4_lpm",
        match_fields={
            "hdr.ipv4.dstAddr": (ipv4_dst, lpm)
        },
        action_name="MyIngress.ipv4_forward",
        action_params={
            "dstAddr": dst_mac,
            "port": out_port
        })
    ingress_sw.WriteTableEntry(table_entry)
    print("Installed ipv4 rule on %s" % ingress_sw.name)

This is the invocation of the above function which is inside a thead:

write_IPv4_Rules(p4info_helper,ingress_sw,ip_dest,32,dst_mac,2)

Below i have the code of a controller that uses grpc service:

class SwitchConnection(object):

  def __init__(self, name=None, address='127.0.0.1:50051', device_id=0,
             proto_dump_file=None):
    self.name = name
    self.address = address
    self.device_id = device_id
    self.p4info = None
    self.channel = grpc.insecure_channel(self.address)
    if proto_dump_file is not None:
        interceptor = GrpcRequestLogger(proto_dump_file)
        self.channel = grpc.intercept_channel(self.channel, interceptor)
    self.client_stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel)
    self.requests_stream = IterableQueue()
    self.stream_msg_resp = self.client_stub.StreamChannel(iter(self.requests_stream))
    self.proto_dump_file = proto_dump_file
    connections.append(self)

def WriteTableEntry(self, table_entry, dry_run=False):
    request = p4runtime_pb2.WriteRequest()
    request.device_id = self.device_id
    request.election_id.low = 1
    update = request.updates.add()
    if table_entry.is_default_action:
        update.type = p4runtime_pb2.Update.MODIFY
    else:
        update.type = p4runtime_pb2.Update.INSERT
    update.entity.table_entry.CopyFrom(table_entry)
    if dry_run:
        print("P4Runtime Write:", request)
    else:
        self.client_stub.Write(request)


class GrpcRequestLogger(grpc.UnaryUnaryClientInterceptor,
                        grpc.UnaryStreamClientInterceptor):
    """Implementation of a gRPC interceptor that logs request to a file"""

def __init__(self, log_file):
    self.log_file = log_file
    with open(self.log_file, 'w') as f:
        # Clear content if it exists.
        f.write("")

def log_message(self, method_name, body):
    with open(self.log_file, 'a') as f:
        ts = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
        msg = str(body)
        f.write("\n[%s] %s\n---\n" % (ts, method_name))
        if len(msg) < MSG_LOG_MAX_LEN:
            f.write(str(body))
        else:
            f.write("Message too long (%d bytes)! Skipping log...\n" % len(msg))
        f.write('---\n')

def intercept_unary_unary(self, continuation, client_call_details, request):
    self.log_message(client_call_details.method, request)
    return continuation(client_call_details, request)

def intercept_unary_stream(self, continuation, client_call_details, request):
    self.log_message(client_call_details.method, request)
    return continuation(client_call_details, request)

class IterableQueue(Queue):
    _sentinel = object()

    def __iter__(self):
        return iter(self.get, self._sentinel)

    def close(self):
        self.put(self._sentinel)

The exception that i receive when i run the program:

Exception in thread Thread-11:
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "./mycontroller.py", line 348, in packet_router_processing
    ipv4_forwarding(p4info_helper,extract_header,ingress_sw,packetIn.packet.metadata)
  File "./mycontroller.py", line 256, in ipv4_forwarding
    write_IPv4_Rules(p4info_helper,ingress_sw,ip_dest,24,dst_mac,1)
  File "./mycontroller.py", line 38, in write_IPv4_Rules
    ingress_sw.WriteTableEntry(table_entry)
  File "/home/p4/tutorials/exercises/test/../../utils/p4runtime_lib/switch.py", line 102, in WriteTableEntry
    self.client_stub.Write(request)
  File "/usr/local/lib/python3.8/dist-packages/grpc/_interceptor.py", line 207, in __call__
    response, ignored_call = self._with_call(
  File "/usr/local/lib/python3.8/dist-packages/grpc/_interceptor.py", line 240, in _with_call
    call = self._interceptor.intercept_unary_unary(
  File "/home/p4/tutorials/exercises/test/../../utils/p4runtime_lib/switch.py", line 220, in intercept_unary_unary
    return continuation(client_call_details, request)
  File "/usr/local/lib/python3.8/dist-packages/grpc/_interceptor.py", line 228, in continuation
    response, call = self._thunk(new_method).with_call(
  File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 557, in with_call
    return _end_unary_response_blocking(state, call, True, None)
  File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 466, in _end_unary_response_blocking
    raise _Rendezvous(state, None, None, deadline)
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = ""
        debug_error_string = "{"created":"@1646087190.862135612","description":"Error received from peer","file":"src/core/lib/surface/call.cc","file_line":1036,"grpc_message":"","grpc_status":2}"

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文