如何使用dask读取Protobuf文件?

发布于 2025-01-24 05:23:04 字数 10553 浏览 0 评论 0原文

是否有人尝试通过Dask读取Protobuf文件?我拥有的每个Protobuf文件都有多个记录,并且每个记录都在摘要中所示的记录长度(4个字节)前缀。

这是当前读取/解析这些文件的当前代码的样子:

import mystream.MyPacket_pb2 as pb
import struct
​
def read_uint32(data): # little endian
    return struct.unpack('<L', data)[0]
​
def parse(filepath):
    packets = []
    prefix = 4
    pos, data = 0, open(filepath, 'rb').read()
    count = 0
    while pos < len(data):
        next_pos = read_uint32(data[pos:pos+4])
        packet = pb.MyPacket()
        packet.ParseFromString(data[prefix+pos:prefix+pos+next_pos])
        pos += prefix + next_pos
        packets.append(packet)
    return packets
​
# Some more processing to follow after

代码段独立运行良好,但是使用简单distribute.client.submit()/map()/map()在使用dask上失败。错误:异常:无法腌制'google.protobuf.pyext._message.message.scriptor'对象

我还对Protobuf&amp; dask 在这里。这是一个非常有趣的讨论,但已经6年了。

我很想听听任何人是否对我们如何使这样的事情起作用(如果可能)有想法。

编辑:在此处发布整个示例。

import json
import struct
from google.protobuf.json_format import MessageToJson
import mapstream.MapstreamPacket_pb2 as pb

def read_uint32(data): # little endian
    return struct.unpack('<L', data)[0]

def parse_mapstream(path):
    prefix = 4
    pos, data = 0, open(path, 'rb').read()
    count = 0
    # Just parse the first 3 packets for testing
    while pos < len(data) and count < 3:
        next_pos = read_uint32(data[pos:pos+4])
        packet = pb.MapstreamPacket()
        packet.ParseFromString(data[prefix+pos:prefix+pos+next_pos])
        pos += prefix + next_pos
        count += 1
        yield packet

# the packet message is one of these types
factories = {
    pb.PacketType.TRAJECTORY: pb.TrajectoryPacket,
}

# given a packet wrapper, decode the wrapped packet type
def dispatch(packet):
    factory = factories[packet.type]()
    factory.ParseFromString(packet.message)
    return factory

def extract_packets():
    path = "uncompressed-ms.protobuf"
    packets1 = parse_mapstream(path)
    wanted = set([pb.PacketType.TRAJECTORY])
    packets2 = (dispatch(packet) for packet in packets1 if packet.type in wanted)
    packets3 = (MessageToJson(packet) for packet in packets2)
    list_packets3 = list(packets3)
    # Return only the first packet as JSON
    return json.dumps(json.loads(list_packets3[0]), indent=2)


# Without Dask
print(extract_packets())

# With Dask
import distributed
client = distributed.Client("dask-scheduler-ip")
client.get_versions(check=True)
res = client.submit(extract_packets)
print(client.gather(res))

输出w/o dask:

{
  "geoPose": {
    "longitude": 11.675636545410766,
    "latitude": 48.036162288714884,
    "altitude": 567.4531918477081,
    "rigToENU": {
      "x": -0.009063592,
      "y": -0.00394439,
      "z": -0.9173354,
      "w": 0.39799264
    },
    "locationCovariance": [
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0
    ],
    "rotationCovariance": [
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0
    ]
  }
}

错误w/dask:

distributed.protocol.pickle - INFO - Failed to serialize <function extract_packets at 0x7f10c436ee50>. Exception: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/worker.py:4480, in dumps_function(func)
   4479     with _cache_lock:
-> 4480         result = cache_dumps[func]
   4481 except KeyError:

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/utils.py:1358, in LRU.__getitem__(self, key)
   1357 def __getitem__(self, key):
-> 1358     value = super().__getitem__(key)
   1359     self.data.move_to_end(key)

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/collections/__init__.py:1010, in UserDict.__getitem__(self, key)
   1009     return self.__class__.__missing__(self, key)
-> 1010 raise KeyError(key)

KeyError: <function extract_packets at 0x7f10c436ee50>

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/protocol/pickle.py:44, in dumps(x, buffer_callback, protocol)
     43         buffers.clear()
---> 44         result = cloudpickle.dumps(x, **dump_kwargs)
     45 elif not _always_use_pickle_for(x) and b"__main__" in result:

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
     70 cp = CloudPickler(
     71     file, protocol=protocol, buffer_callback=buffer_callback
     72 )
---> 73 cp.dump(obj)
     74 return file.getvalue()

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
    601 try:
--> 602     return Pickler.dump(self, obj)
    603 except RuntimeError as e:

TypeError: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
Input In [8], in <cell line: 8>()
      5 client.get_versions(check=True)
      6 client
----> 8 res = client.submit(extract_packets)
      9 print(client.gather(res))

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/client.py:1758, in Client.submit(self, func, key, workers, resources, retries, priority, fifo_timeout, allow_other_workers, actor, actors, pure, *args, **kwargs)
   1755 else:
   1756     dsk = {skey: (func,) + tuple(args)}
-> 1758 futures = self._graph_to_futures(
   1759     dsk,
   1760     [skey],
   1761     workers=workers,
   1762     allow_other_workers=allow_other_workers,
   1763     priority={skey: 0},
   1764     user_priority=priority,
   1765     resources=resources,
   1766     retries=retries,
   1767     fifo_timeout=fifo_timeout,
   1768     actors=actor,
   1769 )
   1771 logger.debug("Submit %s(...), %s", funcname(func), key)
   1773 return futures[skey]

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/client.py:2898, in Client._graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
   2896 # Pack the high level graph before sending it to the scheduler
   2897 keyset = set(keys)
-> 2898 dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
   2900 # Create futures before sending graph (helps avoid contention)
   2901 futures = {key: Future(key, self, inform=False) for key in keyset}

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/dask/highlevelgraph.py:1055, in HighLevelGraph.__dask_distributed_pack__(self, client, client_keys, annotations)
   1049 layers = []
   1050 for layer in (self.layers[name] for name in self._toposort_layers()):
   1051     layers.append(
   1052         {
   1053             "__module__": layer.__module__,
   1054             "__name__": type(layer).__name__,
-> 1055             "state": layer.__dask_distributed_pack__(
   1056                 self.get_all_external_keys(),
   1057                 self.key_dependencies,
   1058                 client,
   1059                 client_keys,
   1060             ),
   1061             "annotations": layer.__dask_distributed_annotations_pack__(
   1062                 annotations
   1063             ),
   1064         }
   1065     )
   1066 return {"layers": layers}

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/dask/highlevelgraph.py:425, in Layer.__dask_distributed_pack__(self, all_hlg_keys, known_key_dependencies, client, client_keys)
    420 merged_hlg_keys = all_hlg_keys | dsk.keys()
    421 dsk = {
    422     stringify(k): stringify(v, exclusive=merged_hlg_keys)
    423     for k, v in dsk.items()
    424 }
--> 425 dsk = toolz.valmap(dumps_task, dsk)
    426 return {"dsk": dsk, "dependencies": dependencies}

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/toolz/dicttoolz.py:83, in valmap(func, d, factory)
     72 """ Apply function to values of dictionary
     73 
     74 >>> bills = {"Alice": [20, 15, 30], "Bob": [10, 35]}
   (...)
     80     itemmap
     81 """
     82 rv = factory()
---> 83 rv.update(zip(d.keys(), map(func, d.values())))
     84 return rv

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/worker.py:4518, in dumps_task(task)
   4516         return d
   4517     elif not any(map(_maybe_complex, task[1:])):
-> 4518         return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
   4519 return to_serialize(task)

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/worker.py:4482, in dumps_function(func)
   4480         result = cache_dumps[func]
   4481 except KeyError:
-> 4482     result = pickle.dumps(func, protocol=4)
   4483     if len(result) < 100000:
   4484         with _cache_lock:

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/protocol/pickle.py:51, in dumps(x, buffer_callback, protocol)
     49 try:
     50     buffers.clear()
---> 51     result = cloudpickle.dumps(x, **dump_kwargs)
     52 except Exception as e:
     53     logger.info("Failed to serialize %s. Exception: %s", x, e)

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
     69 with io.BytesIO() as file:
     70     cp = CloudPickler(
     71         file, protocol=protocol, buffer_callback=buffer_callback
     72     )
---> 73     cp.dump(obj)
     74     return file.getvalue()

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
    600 def dump(self, obj):
    601     try:
--> 602         return Pickler.dump(self, obj)
    603     except RuntimeError as e:
    604         if "recursion" in e.args[0]:

TypeError: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object

Has anyone tried reading Protobuf files over Dask? Each Protobuf file I have, has multiple records, and each record is prefixed with the length of the record (4 bytes) as shown in the snippet.

This is what the current code to read/parse these files looks like:

import mystream.MyPacket_pb2 as pb
import struct
​
def read_uint32(data): # little endian
    return struct.unpack('<L', data)[0]
​
def parse(filepath):
    packets = []
    prefix = 4
    pos, data = 0, open(filepath, 'rb').read()
    count = 0
    while pos < len(data):
        next_pos = read_uint32(data[pos:pos+4])
        packet = pb.MyPacket()
        packet.ParseFromString(data[prefix+pos:prefix+pos+next_pos])
        pos += prefix + next_pos
        packets.append(packet)
    return packets
​
# Some more processing to follow after

The code snippet standalone runs fine, but fails over Dask when using a simple distributed.Client.submit()/map() with the error: Exception: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object.

I also left a comment on the only GitHub discussion I could find on Protobuf & Dask here. This is a pretty interesting discussion, but it's been 6 years already.

I'd love to hear if any one has ideas on how we can get something like this to work, if at all possible.

EDIT: Posting the entire example here.

import json
import struct
from google.protobuf.json_format import MessageToJson
import mapstream.MapstreamPacket_pb2 as pb

def read_uint32(data): # little endian
    return struct.unpack('<L', data)[0]

def parse_mapstream(path):
    prefix = 4
    pos, data = 0, open(path, 'rb').read()
    count = 0
    # Just parse the first 3 packets for testing
    while pos < len(data) and count < 3:
        next_pos = read_uint32(data[pos:pos+4])
        packet = pb.MapstreamPacket()
        packet.ParseFromString(data[prefix+pos:prefix+pos+next_pos])
        pos += prefix + next_pos
        count += 1
        yield packet

# the packet message is one of these types
factories = {
    pb.PacketType.TRAJECTORY: pb.TrajectoryPacket,
}

# given a packet wrapper, decode the wrapped packet type
def dispatch(packet):
    factory = factories[packet.type]()
    factory.ParseFromString(packet.message)
    return factory

def extract_packets():
    path = "uncompressed-ms.protobuf"
    packets1 = parse_mapstream(path)
    wanted = set([pb.PacketType.TRAJECTORY])
    packets2 = (dispatch(packet) for packet in packets1 if packet.type in wanted)
    packets3 = (MessageToJson(packet) for packet in packets2)
    list_packets3 = list(packets3)
    # Return only the first packet as JSON
    return json.dumps(json.loads(list_packets3[0]), indent=2)


# Without Dask
print(extract_packets())

# With Dask
import distributed
client = distributed.Client("dask-scheduler-ip")
client.get_versions(check=True)
res = client.submit(extract_packets)
print(client.gather(res))

Output w/o Dask:

{
  "geoPose": {
    "longitude": 11.675636545410766,
    "latitude": 48.036162288714884,
    "altitude": 567.4531918477081,
    "rigToENU": {
      "x": -0.009063592,
      "y": -0.00394439,
      "z": -0.9173354,
      "w": 0.39799264
    },
    "locationCovariance": [
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0
    ],
    "rotationCovariance": [
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0
    ]
  }
}

Error w/ Dask:

distributed.protocol.pickle - INFO - Failed to serialize <function extract_packets at 0x7f10c436ee50>. Exception: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/worker.py:4480, in dumps_function(func)
   4479     with _cache_lock:
-> 4480         result = cache_dumps[func]
   4481 except KeyError:

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/utils.py:1358, in LRU.__getitem__(self, key)
   1357 def __getitem__(self, key):
-> 1358     value = super().__getitem__(key)
   1359     self.data.move_to_end(key)

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/collections/__init__.py:1010, in UserDict.__getitem__(self, key)
   1009     return self.__class__.__missing__(self, key)
-> 1010 raise KeyError(key)

KeyError: <function extract_packets at 0x7f10c436ee50>

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/protocol/pickle.py:44, in dumps(x, buffer_callback, protocol)
     43         buffers.clear()
---> 44         result = cloudpickle.dumps(x, **dump_kwargs)
     45 elif not _always_use_pickle_for(x) and b"__main__" in result:

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
     70 cp = CloudPickler(
     71     file, protocol=protocol, buffer_callback=buffer_callback
     72 )
---> 73 cp.dump(obj)
     74 return file.getvalue()

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
    601 try:
--> 602     return Pickler.dump(self, obj)
    603 except RuntimeError as e:

TypeError: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
Input In [8], in <cell line: 8>()
      5 client.get_versions(check=True)
      6 client
----> 8 res = client.submit(extract_packets)
      9 print(client.gather(res))

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/client.py:1758, in Client.submit(self, func, key, workers, resources, retries, priority, fifo_timeout, allow_other_workers, actor, actors, pure, *args, **kwargs)
   1755 else:
   1756     dsk = {skey: (func,) + tuple(args)}
-> 1758 futures = self._graph_to_futures(
   1759     dsk,
   1760     [skey],
   1761     workers=workers,
   1762     allow_other_workers=allow_other_workers,
   1763     priority={skey: 0},
   1764     user_priority=priority,
   1765     resources=resources,
   1766     retries=retries,
   1767     fifo_timeout=fifo_timeout,
   1768     actors=actor,
   1769 )
   1771 logger.debug("Submit %s(...), %s", funcname(func), key)
   1773 return futures[skey]

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/client.py:2898, in Client._graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
   2896 # Pack the high level graph before sending it to the scheduler
   2897 keyset = set(keys)
-> 2898 dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
   2900 # Create futures before sending graph (helps avoid contention)
   2901 futures = {key: Future(key, self, inform=False) for key in keyset}

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/dask/highlevelgraph.py:1055, in HighLevelGraph.__dask_distributed_pack__(self, client, client_keys, annotations)
   1049 layers = []
   1050 for layer in (self.layers[name] for name in self._toposort_layers()):
   1051     layers.append(
   1052         {
   1053             "__module__": layer.__module__,
   1054             "__name__": type(layer).__name__,
-> 1055             "state": layer.__dask_distributed_pack__(
   1056                 self.get_all_external_keys(),
   1057                 self.key_dependencies,
   1058                 client,
   1059                 client_keys,
   1060             ),
   1061             "annotations": layer.__dask_distributed_annotations_pack__(
   1062                 annotations
   1063             ),
   1064         }
   1065     )
   1066 return {"layers": layers}

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/dask/highlevelgraph.py:425, in Layer.__dask_distributed_pack__(self, all_hlg_keys, known_key_dependencies, client, client_keys)
    420 merged_hlg_keys = all_hlg_keys | dsk.keys()
    421 dsk = {
    422     stringify(k): stringify(v, exclusive=merged_hlg_keys)
    423     for k, v in dsk.items()
    424 }
--> 425 dsk = toolz.valmap(dumps_task, dsk)
    426 return {"dsk": dsk, "dependencies": dependencies}

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/toolz/dicttoolz.py:83, in valmap(func, d, factory)
     72 """ Apply function to values of dictionary
     73 
     74 >>> bills = {"Alice": [20, 15, 30], "Bob": [10, 35]}
   (...)
     80     itemmap
     81 """
     82 rv = factory()
---> 83 rv.update(zip(d.keys(), map(func, d.values())))
     84 return rv

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/worker.py:4518, in dumps_task(task)
   4516         return d
   4517     elif not any(map(_maybe_complex, task[1:])):
-> 4518         return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
   4519 return to_serialize(task)

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/worker.py:4482, in dumps_function(func)
   4480         result = cache_dumps[func]
   4481 except KeyError:
-> 4482     result = pickle.dumps(func, protocol=4)
   4483     if len(result) < 100000:
   4484         with _cache_lock:

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/distributed/protocol/pickle.py:51, in dumps(x, buffer_callback, protocol)
     49 try:
     50     buffers.clear()
---> 51     result = cloudpickle.dumps(x, **dump_kwargs)
     52 except Exception as e:
     53     logger.info("Failed to serialize %s. Exception: %s", x, e)

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
     69 with io.BytesIO() as file:
     70     cp = CloudPickler(
     71         file, protocol=protocol, buffer_callback=buffer_callback
     72     )
---> 73     cp.dump(obj)
     74     return file.getvalue()

File ~/anaconda3/envs/rapids-22.04/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
    600 def dump(self, obj):
    601     try:
--> 602         return Pickler.dump(self, obj)
    603     except RuntimeError as e:
    604         if "recursion" in e.args[0]:

TypeError: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文