python grpc.channel._Rendevezous 异常状态 Unknwon
我建立了 grpc 连接,并尝试在通道中发出请求,但是当我从客户端调用服务时,出现以下异常。有人知道吗?线程的使用是其原因吗?我不明白它出了什么问题。
这是带有服务的 protobuf 架构:
service P4Runtime {
// Update one or more P4 entities on the target.
rpc Write(WriteRequest) returns (WriteResponse) {
}
// Read one or more P4 entities from the target.
rpc Read(ReadRequest) returns (stream ReadResponse) {
}
// Sets the P4 forwarding-pipeline config.
rpc SetForwardingPipelineConfig(SetForwardingPipelineConfigRequest)
returns (SetForwardingPipelineConfigResponse) {
}
// Gets the current P4 forwarding-pipeline config.
rpc GetForwardingPipelineConfig(GetForwardingPipelineConfigRequest)
returns (GetForwardingPipelineConfigResponse) {
}
// Represents the bidirectional stream between the controller and the
// switch (initiated by the controller), and is managed for the following
// purposes:
// - connection initiation through client arbitration
// - indicating switch session liveness: the session is live when switch
// sends a positive client arbitration update to the controller, and is
// considered dead when either the stream breaks or the switch sends a
// negative update for client arbitration
// - the controller sending/receiving packets to/from the switch
// - streaming of notifications from the switch
rpc StreamChannel(stream StreamMessageRequest)
returns (stream StreamMessageResponse) {
}
rpc Capabilities(CapabilitiesRequest) returns (CapabilitiesResponse) {
}
}
下面是我调用的函数,并且发生了异常:
def write_IPv4_Rules(p4info_helper,ingress_sw,ipv4_dst,lpm,dst_mac,out_port):
table_entry = p4info_helper.buildTableEntry(
table_name="MyIngress.ipv4_lpm",
match_fields={
"hdr.ipv4.dstAddr": (ipv4_dst, lpm)
},
action_name="MyIngress.ipv4_forward",
action_params={
"dstAddr": dst_mac,
"port": out_port
})
ingress_sw.WriteTableEntry(table_entry)
print("Installed ipv4 rule on %s" % ingress_sw.name)
这是对位于 thead 内的上述函数的调用:
write_IPv4_Rules(p4info_helper,ingress_sw,ip_dest,32,dst_mac,2)
下面我有使用 grpc 服务的控制器的代码:
class SwitchConnection(object):
def __init__(self, name=None, address='127.0.0.1:50051', device_id=0,
proto_dump_file=None):
self.name = name
self.address = address
self.device_id = device_id
self.p4info = None
self.channel = grpc.insecure_channel(self.address)
if proto_dump_file is not None:
interceptor = GrpcRequestLogger(proto_dump_file)
self.channel = grpc.intercept_channel(self.channel, interceptor)
self.client_stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel)
self.requests_stream = IterableQueue()
self.stream_msg_resp = self.client_stub.StreamChannel(iter(self.requests_stream))
self.proto_dump_file = proto_dump_file
connections.append(self)
def WriteTableEntry(self, table_entry, dry_run=False):
request = p4runtime_pb2.WriteRequest()
request.device_id = self.device_id
request.election_id.low = 1
update = request.updates.add()
if table_entry.is_default_action:
update.type = p4runtime_pb2.Update.MODIFY
else:
update.type = p4runtime_pb2.Update.INSERT
update.entity.table_entry.CopyFrom(table_entry)
if dry_run:
print("P4Runtime Write:", request)
else:
self.client_stub.Write(request)
class GrpcRequestLogger(grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor):
"""Implementation of a gRPC interceptor that logs request to a file"""
def __init__(self, log_file):
self.log_file = log_file
with open(self.log_file, 'w') as f:
# Clear content if it exists.
f.write("")
def log_message(self, method_name, body):
with open(self.log_file, 'a') as f:
ts = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
msg = str(body)
f.write("\n[%s] %s\n---\n" % (ts, method_name))
if len(msg) < MSG_LOG_MAX_LEN:
f.write(str(body))
else:
f.write("Message too long (%d bytes)! Skipping log...\n" % len(msg))
f.write('---\n')
def intercept_unary_unary(self, continuation, client_call_details, request):
self.log_message(client_call_details.method, request)
return continuation(client_call_details, request)
def intercept_unary_stream(self, continuation, client_call_details, request):
self.log_message(client_call_details.method, request)
return continuation(client_call_details, request)
class IterableQueue(Queue):
_sentinel = object()
def __iter__(self):
return iter(self.get, self._sentinel)
def close(self):
self.put(self._sentinel)
异常当我运行程序时我收到:
Exception in thread Thread-11:
Traceback (most recent call last):
File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "./mycontroller.py", line 348, in packet_router_processing
ipv4_forwarding(p4info_helper,extract_header,ingress_sw,packetIn.packet.metadata)
File "./mycontroller.py", line 256, in ipv4_forwarding
write_IPv4_Rules(p4info_helper,ingress_sw,ip_dest,24,dst_mac,1)
File "./mycontroller.py", line 38, in write_IPv4_Rules
ingress_sw.WriteTableEntry(table_entry)
File "/home/p4/tutorials/exercises/test/../../utils/p4runtime_lib/switch.py", line 102, in WriteTableEntry
self.client_stub.Write(request)
File "/usr/local/lib/python3.8/dist-packages/grpc/_interceptor.py", line 207, in __call__
response, ignored_call = self._with_call(
File "/usr/local/lib/python3.8/dist-packages/grpc/_interceptor.py", line 240, in _with_call
call = self._interceptor.intercept_unary_unary(
File "/home/p4/tutorials/exercises/test/../../utils/p4runtime_lib/switch.py", line 220, in intercept_unary_unary
return continuation(client_call_details, request)
File "/usr/local/lib/python3.8/dist-packages/grpc/_interceptor.py", line 228, in continuation
response, call = self._thunk(new_method).with_call(
File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 557, in with_call
return _end_unary_response_blocking(state, call, True, None)
File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 466, in _end_unary_response_blocking
raise _Rendezvous(state, None, None, deadline)
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = ""
debug_error_string = "{"created":"@1646087190.862135612","description":"Error received from peer","file":"src/core/lib/surface/call.cc","file_line":1036,"grpc_message":"","grpc_status":2}"
I have a grpc connection established and i try to make a request in the channel but when i call the service from the client i get the following exception. Does anyone know something about it? Does the usage of threads is the reason for it? I can't figure out what is wrong with it.
This is the protobuf schema with the service:
service P4Runtime {
// Update one or more P4 entities on the target.
rpc Write(WriteRequest) returns (WriteResponse) {
}
// Read one or more P4 entities from the target.
rpc Read(ReadRequest) returns (stream ReadResponse) {
}
// Sets the P4 forwarding-pipeline config.
rpc SetForwardingPipelineConfig(SetForwardingPipelineConfigRequest)
returns (SetForwardingPipelineConfigResponse) {
}
// Gets the current P4 forwarding-pipeline config.
rpc GetForwardingPipelineConfig(GetForwardingPipelineConfigRequest)
returns (GetForwardingPipelineConfigResponse) {
}
// Represents the bidirectional stream between the controller and the
// switch (initiated by the controller), and is managed for the following
// purposes:
// - connection initiation through client arbitration
// - indicating switch session liveness: the session is live when switch
// sends a positive client arbitration update to the controller, and is
// considered dead when either the stream breaks or the switch sends a
// negative update for client arbitration
// - the controller sending/receiving packets to/from the switch
// - streaming of notifications from the switch
rpc StreamChannel(stream StreamMessageRequest)
returns (stream StreamMessageResponse) {
}
rpc Capabilities(CapabilitiesRequest) returns (CapabilitiesResponse) {
}
}
Below is the function that i call and exception happens:
def write_IPv4_Rules(p4info_helper,ingress_sw,ipv4_dst,lpm,dst_mac,out_port):
table_entry = p4info_helper.buildTableEntry(
table_name="MyIngress.ipv4_lpm",
match_fields={
"hdr.ipv4.dstAddr": (ipv4_dst, lpm)
},
action_name="MyIngress.ipv4_forward",
action_params={
"dstAddr": dst_mac,
"port": out_port
})
ingress_sw.WriteTableEntry(table_entry)
print("Installed ipv4 rule on %s" % ingress_sw.name)
This is the invocation of the above function which is inside a thead:
write_IPv4_Rules(p4info_helper,ingress_sw,ip_dest,32,dst_mac,2)
Below i have the code of a controller that uses grpc service:
class SwitchConnection(object):
def __init__(self, name=None, address='127.0.0.1:50051', device_id=0,
proto_dump_file=None):
self.name = name
self.address = address
self.device_id = device_id
self.p4info = None
self.channel = grpc.insecure_channel(self.address)
if proto_dump_file is not None:
interceptor = GrpcRequestLogger(proto_dump_file)
self.channel = grpc.intercept_channel(self.channel, interceptor)
self.client_stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel)
self.requests_stream = IterableQueue()
self.stream_msg_resp = self.client_stub.StreamChannel(iter(self.requests_stream))
self.proto_dump_file = proto_dump_file
connections.append(self)
def WriteTableEntry(self, table_entry, dry_run=False):
request = p4runtime_pb2.WriteRequest()
request.device_id = self.device_id
request.election_id.low = 1
update = request.updates.add()
if table_entry.is_default_action:
update.type = p4runtime_pb2.Update.MODIFY
else:
update.type = p4runtime_pb2.Update.INSERT
update.entity.table_entry.CopyFrom(table_entry)
if dry_run:
print("P4Runtime Write:", request)
else:
self.client_stub.Write(request)
class GrpcRequestLogger(grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor):
"""Implementation of a gRPC interceptor that logs request to a file"""
def __init__(self, log_file):
self.log_file = log_file
with open(self.log_file, 'w') as f:
# Clear content if it exists.
f.write("")
def log_message(self, method_name, body):
with open(self.log_file, 'a') as f:
ts = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
msg = str(body)
f.write("\n[%s] %s\n---\n" % (ts, method_name))
if len(msg) < MSG_LOG_MAX_LEN:
f.write(str(body))
else:
f.write("Message too long (%d bytes)! Skipping log...\n" % len(msg))
f.write('---\n')
def intercept_unary_unary(self, continuation, client_call_details, request):
self.log_message(client_call_details.method, request)
return continuation(client_call_details, request)
def intercept_unary_stream(self, continuation, client_call_details, request):
self.log_message(client_call_details.method, request)
return continuation(client_call_details, request)
class IterableQueue(Queue):
_sentinel = object()
def __iter__(self):
return iter(self.get, self._sentinel)
def close(self):
self.put(self._sentinel)
The exception that i receive when i run the program:
Exception in thread Thread-11:
Traceback (most recent call last):
File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "./mycontroller.py", line 348, in packet_router_processing
ipv4_forwarding(p4info_helper,extract_header,ingress_sw,packetIn.packet.metadata)
File "./mycontroller.py", line 256, in ipv4_forwarding
write_IPv4_Rules(p4info_helper,ingress_sw,ip_dest,24,dst_mac,1)
File "./mycontroller.py", line 38, in write_IPv4_Rules
ingress_sw.WriteTableEntry(table_entry)
File "/home/p4/tutorials/exercises/test/../../utils/p4runtime_lib/switch.py", line 102, in WriteTableEntry
self.client_stub.Write(request)
File "/usr/local/lib/python3.8/dist-packages/grpc/_interceptor.py", line 207, in __call__
response, ignored_call = self._with_call(
File "/usr/local/lib/python3.8/dist-packages/grpc/_interceptor.py", line 240, in _with_call
call = self._interceptor.intercept_unary_unary(
File "/home/p4/tutorials/exercises/test/../../utils/p4runtime_lib/switch.py", line 220, in intercept_unary_unary
return continuation(client_call_details, request)
File "/usr/local/lib/python3.8/dist-packages/grpc/_interceptor.py", line 228, in continuation
response, call = self._thunk(new_method).with_call(
File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 557, in with_call
return _end_unary_response_blocking(state, call, True, None)
File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 466, in _end_unary_response_blocking
raise _Rendezvous(state, None, None, deadline)
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = ""
debug_error_string = "{"created":"@1646087190.862135612","description":"Error received from peer","file":"src/core/lib/surface/call.cc","file_line":1036,"grpc_message":"","grpc_status":2}"
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论