我想了解是否有一种机制来控制从服务器发送到客户端的批量大小。
我已经从 Github repo 实现了 python 服务器一个基本的 F# 客户端。
作为测试,我添加了一个包含 100 万行的航班,我希望将其发送回客户端。
首先,客户端失败并出现以下 GRPC 异常。
发生一个或多个错误。 (Status(StatusCode="ResourceExhausted", Detail="收到的消息超出了配置的最大消息大小。"))
正如建议的,已超出消息大小。作为修复,我可以将允许的最大 grpc 消息大小设置为无限制,
let ops = new GrpcChannelOptions()
ops.MaxReceiveMessageSize <- Nullable()
let downloadChannel = GrpcChannel.ForAddress(uri, ops)
let downloadClient = new FlightClient(download_channel)
即但是,我想了解是否有办法设置从服务器发送到客户端的批量大小,即在服务器的 do_get 方法中
def do_get(self, context, ticket):
key = ast.literal_eval(ticket.ticket.decode())
if key not in self.flights:
return None
return pyarrow.flight.RecordBatchStream(self.flights[key])
我想在创建 pyarrow.flight.RecordBatchStream 时设置批量大小。查看 文档,使用 pyarrow 指定的选项.ipc.IpcWriteOptions 不允许设置批量大小?
预先感谢您的任何帮助:)
更新 - 请参阅下面已接受的答案,它引导我走上了正确的道路。我已按如下方式更新了我的代码来解决该问题。
def do_get(self, context, ticket):
key = ast.literal_eval(ticket.ticket.decode())
if key not in self.flights:
return None
reader = pyarrow.ipc.RecordBatchReader().from_batches(self.flights[key].schema, pyarrow.Table.to_batches(self.flights[key]))
return pyarrow.flight.RecordBatchStream(reader)
I'd like to understand if there's a mechanism to control batch sizes being sent from server to client.
I've implemented the python server from the Github repo and a basic F# client.
As a test, I've added a flight containing 1 million rows which I'd like to send back to the client.
At first, the client fails with the following GRPC exception.
One or more errors occurred. (Status(StatusCode="ResourceExhausted", Detail="Received message exceeds the maximum configured message size."))
As suggested, the message size has been exceeded. As a fix, I can set the maximum allowed grpc message size to be unlimited i.e.
let ops = new GrpcChannelOptions()
ops.MaxReceiveMessageSize <- Nullable()
let downloadChannel = GrpcChannel.ForAddress(uri, ops)
let downloadClient = new FlightClient(download_channel)
However, I'd like to understand if there's a way to set the batch size being sent to the client from the server i.e. in the do_get method of the server
def do_get(self, context, ticket):
key = ast.literal_eval(ticket.ticket.decode())
if key not in self.flights:
return None
return pyarrow.flight.RecordBatchStream(self.flights[key])
I'd like to set the batch size when creating pyarrow.flight.RecordBatchStream. Looking at the documentation, the options specified using pyarrow.ipc.IpcWriteOptions doesn't allow the batch size to be set?
Thanks in advance for any help :)
UPDATE - see the accepted answer below which led me down the correct path. I've updated my code as follows to fix the issue.
def do_get(self, context, ticket):
key = ast.literal_eval(ticket.ticket.decode())
if key not in self.flights:
return None
reader = pyarrow.ipc.RecordBatchReader().from_batches(self.flights[key].schema, pyarrow.Table.to_batches(self.flights[key]))
return pyarrow.flight.RecordBatchStream(reader)
发布评论
评论(1)
假设 self.flights[key] 是一个 pyarrow.Table,您可以使用
Table.to_batches
。 (这不会复制数据,它只会重新切片底层数组。)请注意,大小以行为单位,根据数据类型,它可能与字节< /em>.这是一个不幸的不匹配。您可以使用
get_total_buffer_size
来(廉价地)估计字节大小并根据需要进一步分割批次(尽管如果你有像单个 4MB 字符串这样的东西,你就用完了运气)。Assuming
self.flights[key]
is apyarrow.Table
, you can re-chunk it ahead of time withTable.to_batches
. (This won't copy data, it'll just re-slice the underlying arrays.)Note the size is in rows, which depending on the data type may not correspond well to bytes. This is an unfortunate mismatch. You can use
get_total_buffer_size
to (cheaply) estimate the byte size and split batches further as needed (though if you have something like a single 4MB string, you're out of luck).