如何使用dask读取Protobuf文件?
是否有人尝试通过Dask读取Protobuf文件?我拥有的每个Protobuf文件都有多个记录,并且每个记录都在摘要中所示的记录长度(4个字节)前缀。
这是当前读取/解析这些文件的当前代码的样子:
import mystream.MyPacket_pb2 as pb
import struct
def read_uint32(data): # little endian
return struct.unpack('<L', data)[0]
def parse(filepath):
packets = []
prefix = 4
pos, data = 0, open(filepath, 'rb').read()
count = 0
while pos < len(data):
next_pos = read_uint32(data[pos:pos+4])
packet = pb.MyPacket()
packet.ParseFromString(data[prefix+pos:prefix+pos+next_pos])
pos += prefix + next_pos
packets.append(packet)
return packets
# Some more processing to follow after
代码段独立运行良好,但是使用简单distribute.client.submit()/map()/map()
在使用dask上失败。错误:异常:无法腌制'google.protobuf.pyext._message.message.scriptor'对象
。
我还对Protobuf&amp; dask 在这里。这是一个非常有趣的讨论,但已经6年了。
我很想听听任何人是否对我们如何使这样的事情起作用(如果可能)有想法。
编辑:在此处发布整个示例。
import json
import struct
from google.protobuf.json_format import MessageToJson
import mapstream.MapstreamPacket_pb2 as pb
def read_uint32(data): # little endian
return struct.unpack('<L', data)[0]
def parse_mapstream(path):
prefix = 4
pos, data = 0, open(path, 'rb').read()
count = 0
# Just parse the first 3 packets for testing
while pos < len(data) and count < 3:
next_pos = read_uint32(data[pos:pos+4])
packet = pb.MapstreamPacket()
packet.ParseFromString(data[prefix+pos:prefix+pos+next_pos])
pos += prefix + next_pos
count += 1
yield packet
# the packet message is one of these types
factories = {
pb.PacketType.TRAJECTORY: pb.TrajectoryPacket,
}
# given a packet wrapper, decode the wrapped packet type
def dispatch(packet):
factory = factories[packet.type]()
factory.ParseFromString(packet.message)
return factory
def extract_packets():
path = "uncompressed-ms.protobuf"
packets1 = parse_mapstream(path)
wanted = set([pb.PacketType.TRAJECTORY])
packets2 = (dispatch(packet) for packet in packets1 if packet.type in wanted)
packets3 = (MessageToJson(packet) for packet in packets2)
list_packets3 = list(packets3)
# Return only the first packet as JSON
return json.dumps(json.loads(list_packets3[0]), indent=2)
# Without Dask
print(extract_packets())
# With Dask
import distributed
client = distributed.Client("dask-scheduler-ip")
client.get_versions(check=True)
res = client.submit(extract_packets)
print(client.gather(res))
输出w/o dask:
{
"geoPose": {
"longitude": 11.675636545410766,
"latitude": 48.036162288714884,
"altitude": 567.4531918477081,
"rigToENU": {
"x": -0.009063592,
"y": -0.00394439,
"z": -0.9173354,
"w": 0.39799264
},
"locationCovariance": [
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0
],
"rotationCovariance": [
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0
]
}
}
错误w/dask:
distributed.protocol.pickle - INFO - Failed to serialize <function extract_packets at 0x7f10c436ee50>. Exception: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/worker.py:4480, in dumps_function(func)
4479 with _cache_lock:
-> 4480 result = cache_dumps[func]
4481 except KeyError:
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/utils.py:1358, in LRU.__getitem__(self, key)
1357 def __getitem__(self, key):
-> 1358 value = super().__getitem__(key)
1359 self.data.move_to_end(key)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/collections/__init__.py:1010, in UserDict.__getitem__(self, key)
1009 return self.__class__.__missing__(self, key)
-> 1010 raise KeyError(key)
KeyError: <function extract_packets at 0x7f10c436ee50>
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/protocol/pickle.py:44, in dumps(x, buffer_callback, protocol)
43 buffers.clear()
---> 44 result = cloudpickle.dumps(x, **dump_kwargs)
45 elif not _always_use_pickle_for(x) and b"__main__" in result:
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
70 cp = CloudPickler(
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
601 try:
--> 602 return Pickler.dump(self, obj)
603 except RuntimeError as e:
TypeError: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
Input In [8], in <cell line: 8>()
5 client.get_versions(check=True)
6 client
----> 8 res = client.submit(extract_packets)
9 print(client.gather(res))
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/client.py:1758, in Client.submit(self, func, key, workers, resources, retries, priority, fifo_timeout, allow_other_workers, actor, actors, pure, *args, **kwargs)
1755 else:
1756 dsk = {skey: (func,) + tuple(args)}
-> 1758 futures = self._graph_to_futures(
1759 dsk,
1760 [skey],
1761 workers=workers,
1762 allow_other_workers=allow_other_workers,
1763 priority={skey: 0},
1764 user_priority=priority,
1765 resources=resources,
1766 retries=retries,
1767 fifo_timeout=fifo_timeout,
1768 actors=actor,
1769 )
1771 logger.debug("Submit %s(...), %s", funcname(func), key)
1773 return futures[skey]
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/client.py:2898, in Client._graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
2896 # Pack the high level graph before sending it to the scheduler
2897 keyset = set(keys)
-> 2898 dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
2900 # Create futures before sending graph (helps avoid contention)
2901 futures = {key: Future(key, self, inform=False) for key in keyset}
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/dask/highlevelgraph.py:1055, in HighLevelGraph.__dask_distributed_pack__(self, client, client_keys, annotations)
1049 layers = []
1050 for layer in (self.layers[name] for name in self._toposort_layers()):
1051 layers.append(
1052 {
1053 "__module__": layer.__module__,
1054 "__name__": type(layer).__name__,
-> 1055 "state": layer.__dask_distributed_pack__(
1056 self.get_all_external_keys(),
1057 self.key_dependencies,
1058 client,
1059 client_keys,
1060 ),
1061 "annotations": layer.__dask_distributed_annotations_pack__(
1062 annotations
1063 ),
1064 }
1065 )
1066 return {"layers": layers}
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/dask/highlevelgraph.py:425, in Layer.__dask_distributed_pack__(self, all_hlg_keys, known_key_dependencies, client, client_keys)
420 merged_hlg_keys = all_hlg_keys | dsk.keys()
421 dsk = {
422 stringify(k): stringify(v, exclusive=merged_hlg_keys)
423 for k, v in dsk.items()
424 }
--> 425 dsk = toolz.valmap(dumps_task, dsk)
426 return {"dsk": dsk, "dependencies": dependencies}
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/toolz/dicttoolz.py:83, in valmap(func, d, factory)
72 """ Apply function to values of dictionary
73
74 >>> bills = {"Alice": [20, 15, 30], "Bob": [10, 35]}
(...)
80 itemmap
81 """
82 rv = factory()
---> 83 rv.update(zip(d.keys(), map(func, d.values())))
84 return rv
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/worker.py:4518, in dumps_task(task)
4516 return d
4517 elif not any(map(_maybe_complex, task[1:])):
-> 4518 return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
4519 return to_serialize(task)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/worker.py:4482, in dumps_function(func)
4480 result = cache_dumps[func]
4481 except KeyError:
-> 4482 result = pickle.dumps(func, protocol=4)
4483 if len(result) < 100000:
4484 with _cache_lock:
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/protocol/pickle.py:51, in dumps(x, buffer_callback, protocol)
49 try:
50 buffers.clear()
---> 51 result = cloudpickle.dumps(x, **dump_kwargs)
52 except Exception as e:
53 logger.info("Failed to serialize %s. Exception: %s", x, e)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
69 with io.BytesIO() as file:
70 cp = CloudPickler(
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
600 def dump(self, obj):
601 try:
--> 602 return Pickler.dump(self, obj)
603 except RuntimeError as e:
604 if "recursion" in e.args[0]:
TypeError: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object
Has anyone tried reading Protobuf files over Dask? Each Protobuf file I have, has multiple records, and each record is prefixed with the length of the record (4 bytes) as shown in the snippet.
This is what the current code to read/parse these files looks like:
import mystream.MyPacket_pb2 as pb
import struct
def read_uint32(data): # little endian
return struct.unpack('<L', data)[0]
def parse(filepath):
packets = []
prefix = 4
pos, data = 0, open(filepath, 'rb').read()
count = 0
while pos < len(data):
next_pos = read_uint32(data[pos:pos+4])
packet = pb.MyPacket()
packet.ParseFromString(data[prefix+pos:prefix+pos+next_pos])
pos += prefix + next_pos
packets.append(packet)
return packets
# Some more processing to follow after
The code snippet standalone runs fine, but fails over Dask when using a simple distributed.Client.submit()/map()
with the error: Exception: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object
.
I also left a comment on the only GitHub discussion I could find on Protobuf & Dask here. This is a pretty interesting discussion, but it's been 6 years already.
I'd love to hear if any one has ideas on how we can get something like this to work, if at all possible.
EDIT: Posting the entire example here.
import json
import struct
from google.protobuf.json_format import MessageToJson
import mapstream.MapstreamPacket_pb2 as pb
def read_uint32(data): # little endian
return struct.unpack('<L', data)[0]
def parse_mapstream(path):
prefix = 4
pos, data = 0, open(path, 'rb').read()
count = 0
# Just parse the first 3 packets for testing
while pos < len(data) and count < 3:
next_pos = read_uint32(data[pos:pos+4])
packet = pb.MapstreamPacket()
packet.ParseFromString(data[prefix+pos:prefix+pos+next_pos])
pos += prefix + next_pos
count += 1
yield packet
# the packet message is one of these types
factories = {
pb.PacketType.TRAJECTORY: pb.TrajectoryPacket,
}
# given a packet wrapper, decode the wrapped packet type
def dispatch(packet):
factory = factories[packet.type]()
factory.ParseFromString(packet.message)
return factory
def extract_packets():
path = "uncompressed-ms.protobuf"
packets1 = parse_mapstream(path)
wanted = set([pb.PacketType.TRAJECTORY])
packets2 = (dispatch(packet) for packet in packets1 if packet.type in wanted)
packets3 = (MessageToJson(packet) for packet in packets2)
list_packets3 = list(packets3)
# Return only the first packet as JSON
return json.dumps(json.loads(list_packets3[0]), indent=2)
# Without Dask
print(extract_packets())
# With Dask
import distributed
client = distributed.Client("dask-scheduler-ip")
client.get_versions(check=True)
res = client.submit(extract_packets)
print(client.gather(res))
Output w/o Dask:
{
"geoPose": {
"longitude": 11.675636545410766,
"latitude": 48.036162288714884,
"altitude": 567.4531918477081,
"rigToENU": {
"x": -0.009063592,
"y": -0.00394439,
"z": -0.9173354,
"w": 0.39799264
},
"locationCovariance": [
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0
],
"rotationCovariance": [
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0
]
}
}
Error w/ Dask:
distributed.protocol.pickle - INFO - Failed to serialize <function extract_packets at 0x7f10c436ee50>. Exception: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/worker.py:4480, in dumps_function(func)
4479 with _cache_lock:
-> 4480 result = cache_dumps[func]
4481 except KeyError:
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/utils.py:1358, in LRU.__getitem__(self, key)
1357 def __getitem__(self, key):
-> 1358 value = super().__getitem__(key)
1359 self.data.move_to_end(key)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/collections/__init__.py:1010, in UserDict.__getitem__(self, key)
1009 return self.__class__.__missing__(self, key)
-> 1010 raise KeyError(key)
KeyError: <function extract_packets at 0x7f10c436ee50>
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/protocol/pickle.py:44, in dumps(x, buffer_callback, protocol)
43 buffers.clear()
---> 44 result = cloudpickle.dumps(x, **dump_kwargs)
45 elif not _always_use_pickle_for(x) and b"__main__" in result:
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
70 cp = CloudPickler(
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
601 try:
--> 602 return Pickler.dump(self, obj)
603 except RuntimeError as e:
TypeError: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
Input In [8], in <cell line: 8>()
5 client.get_versions(check=True)
6 client
----> 8 res = client.submit(extract_packets)
9 print(client.gather(res))
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/client.py:1758, in Client.submit(self, func, key, workers, resources, retries, priority, fifo_timeout, allow_other_workers, actor, actors, pure, *args, **kwargs)
1755 else:
1756 dsk = {skey: (func,) + tuple(args)}
-> 1758 futures = self._graph_to_futures(
1759 dsk,
1760 [skey],
1761 workers=workers,
1762 allow_other_workers=allow_other_workers,
1763 priority={skey: 0},
1764 user_priority=priority,
1765 resources=resources,
1766 retries=retries,
1767 fifo_timeout=fifo_timeout,
1768 actors=actor,
1769 )
1771 logger.debug("Submit %s(...), %s", funcname(func), key)
1773 return futures[skey]
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/client.py:2898, in Client._graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
2896 # Pack the high level graph before sending it to the scheduler
2897 keyset = set(keys)
-> 2898 dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
2900 # Create futures before sending graph (helps avoid contention)
2901 futures = {key: Future(key, self, inform=False) for key in keyset}
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/dask/highlevelgraph.py:1055, in HighLevelGraph.__dask_distributed_pack__(self, client, client_keys, annotations)
1049 layers = []
1050 for layer in (self.layers[name] for name in self._toposort_layers()):
1051 layers.append(
1052 {
1053 "__module__": layer.__module__,
1054 "__name__": type(layer).__name__,
-> 1055 "state": layer.__dask_distributed_pack__(
1056 self.get_all_external_keys(),
1057 self.key_dependencies,
1058 client,
1059 client_keys,
1060 ),
1061 "annotations": layer.__dask_distributed_annotations_pack__(
1062 annotations
1063 ),
1064 }
1065 )
1066 return {"layers": layers}
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/dask/highlevelgraph.py:425, in Layer.__dask_distributed_pack__(self, all_hlg_keys, known_key_dependencies, client, client_keys)
420 merged_hlg_keys = all_hlg_keys | dsk.keys()
421 dsk = {
422 stringify(k): stringify(v, exclusive=merged_hlg_keys)
423 for k, v in dsk.items()
424 }
--> 425 dsk = toolz.valmap(dumps_task, dsk)
426 return {"dsk": dsk, "dependencies": dependencies}
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/toolz/dicttoolz.py:83, in valmap(func, d, factory)
72 """ Apply function to values of dictionary
73
74 >>> bills = {"Alice": [20, 15, 30], "Bob": [10, 35]}
(...)
80 itemmap
81 """
82 rv = factory()
---> 83 rv.update(zip(d.keys(), map(func, d.values())))
84 return rv
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/worker.py:4518, in dumps_task(task)
4516 return d
4517 elif not any(map(_maybe_complex, task[1:])):
-> 4518 return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
4519 return to_serialize(task)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/worker.py:4482, in dumps_function(func)
4480 result = cache_dumps[func]
4481 except KeyError:
-> 4482 result = pickle.dumps(func, protocol=4)
4483 if len(result) < 100000:
4484 with _cache_lock:
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/protocol/pickle.py:51, in dumps(x, buffer_callback, protocol)
49 try:
50 buffers.clear()
---> 51 result = cloudpickle.dumps(x, **dump_kwargs)
52 except Exception as e:
53 logger.info("Failed to serialize %s. Exception: %s", x, e)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
69 with io.BytesIO() as file:
70 cp = CloudPickler(
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
600 def dump(self, obj):
601 try:
--> 602 return Pickler.dump(self, obj)
603 except RuntimeError as e:
604 if "recursion" in e.args[0]:
TypeError: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论