中流解酸 (python)
我正在编写脚本来通过反复取消对象直到 EOF 来处理(非常大)文件。我想对文件进行分区,并有单独的进程(在云中)解封和处理单独的部分。
然而,我的分区器并不智能,它不知道文件中腌制对象之间的边界(因为这些边界取决于正在腌制的对象类型等)。
有没有办法扫描文件中的“开始腌制对象”哨兵?最简单的方法是尝试在连续的字节偏移量处取消pickle,直到成功pickle对象为止,但这会产生意外的错误。似乎对于某些输入组合,unpickler 不同步并且不会为文件的其余部分返回任何内容(请参见下面的代码)。
import cPickle
import os
def stream_unpickle(file_obj):
while True:
start_pos = file_obj.tell()
try:
yield cPickle.load(file_obj)
except (EOFError, KeyboardInterrupt):
break
except (cPickle.UnpicklingError, ValueError, KeyError, TypeError, ImportError):
file_obj.seek(start_pos+1, os.SEEK_SET)
if __name__ == '__main__':
import random
from StringIO import StringIO
# create some data
sio = StringIO()
[cPickle.dump(random.random(), sio, cPickle.HIGHEST_PROTOCOL) for _ in xrange(1000)]
sio.flush()
# read from subsequent offsets and find discontinuous jumps in object count
size = sio.tell()
last_count = None
for step in xrange(size):
sio.seek(step, os.SEEK_SET)
count = sum(1 for _ in stream_unpickle(file_obj))
if last_count is None or count == last_count - 1:
last_count = count
elif count != last_count:
# if successful, these should never print (but they do...)
print '%d elements read from byte %d' % (count, step)
print '(%d elements read from byte %d)' % (last_count, step-1)
last_count = count
I am writing scripts to process (very large) files by repeatedly unpickling objects until EOF. I would like to partition the file and have separate processes (in the cloud) unpickle and process separate parts.
However my partitioner is not intelligent, it does not know about the boundaries between pickled objects in the file (since those boundaries depend on the object types being pickled, etc.).
Is there a way to scan a file for a "start pickled object" sentinel? The naive way would be to attempt unpickling at successive byte offsets until an object is successfully pickled, but that yields unexpected errors. It seems that for certain combinations of input, the unpickler falls out of sync and returns nothing for the rest of the file (see code below).
import cPickle
import os
def stream_unpickle(file_obj):
while True:
start_pos = file_obj.tell()
try:
yield cPickle.load(file_obj)
except (EOFError, KeyboardInterrupt):
break
except (cPickle.UnpicklingError, ValueError, KeyError, TypeError, ImportError):
file_obj.seek(start_pos+1, os.SEEK_SET)
if __name__ == '__main__':
import random
from StringIO import StringIO
# create some data
sio = StringIO()
[cPickle.dump(random.random(), sio, cPickle.HIGHEST_PROTOCOL) for _ in xrange(1000)]
sio.flush()
# read from subsequent offsets and find discontinuous jumps in object count
size = sio.tell()
last_count = None
for step in xrange(size):
sio.seek(step, os.SEEK_SET)
count = sum(1 for _ in stream_unpickle(file_obj))
if last_count is None or count == last_count - 1:
last_count = count
elif count != last_count:
# if successful, these should never print (but they do...)
print '%d elements read from byte %d' % (count, step)
print '(%d elements read from byte %d)' % (last_count, step-1)
last_count = count
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
pickletools 模块有一个 显示操作码的 dis 函数。它表明您可能会扫描一个 STOP 操作码:
注意,使用 STOP 操作码有点棘手,因为代码长度可变,但它可以作为有关截止位置的有用提示。
如果您控制另一端的酸洗步骤,那么您可以通过添加自己的明确的替代分隔符来改善这种情况:
在拆包之前,使用已知的分隔符将其分成单独的泡菜:
The pickletools module has a dis function that shows the opcodes. It shows that there is a STOP opcode that you may be scan for:
Note, using the STOP opcode is a bit tricky because the codes are of variable length, but it may serve as a useful hint about where the cutoffs are.
If you control the pickling step on the other end, then you can improve the situation by adding your own unambiguous alternative separator:
Before unpacking, split into separate pickles using the known separator:
在 pickled 文件中,一些操作码有一个参数——操作码后面的数据值。数据值的长度不同,并且可以包含与操作码相同的字节。因此,如果您从任意位置开始读取文件,您将无法知道您是在查看操作码还是在参数中间。您必须从头开始读取文件并解析操作码。
我编写了这个函数,它从文件中跳过一个pickle,即读取它并解析操作码,但不构造对象。在我拥有的某些文件上,它似乎比 cPickle.loads 稍快一些。您可以用 C 重写它以提高速度。 (正确测试后)
然后,您可以对整个文件进行一次遍历以获取每个 pickle 的查找位置。
In the pickled file, some opcodes have an argument -- a data value that follows the opcode. The data values vary in length, and can contain bytes identical to opcodes. Therefore, if you start reading the file from an arbitrary position, you have no way of knowing if you are looking at an opcode or in the middle of an argument. You must read the file from beginning and parse the opcodes.
I cooked up this function that skips one pickle from a file, i.e. reads it and parses opcodes, but does not construct the objects. It seems slightly faster than
cPickle.loads
on some files I have. You could rewrite this in C for more speed. (after testing this properly)Then, you can make one pass over the whole file to get the seek position of each pickle.
很抱歉回答我自己的问题,并感谢@RaymondHettinger 提出添加哨兵的想法。
这对我有用。我创建了读取器和写入器,它们在每个“记录”的开头使用哨兵
'#S'
后跟数据块长度。编写者必须小心地找到正在写入的数据中出现的任何'#'
并将其加倍(变为'##'
)。然后,读取器使用后向正则表达式来查找与原始流中可能存在的任何匹配值不同的哨兵,并验证该哨兵与后续哨兵之间的字节数。RecordWriter 是一个上下文管理器(因此,如果需要,可以将对 write() 的多个调用封装到单个记录中)。 RecordReader 是一个生成器。
不确定这对性能有何影响。欢迎任何更快/更优雅的解决方案。
Sorry to answer my own question, and thanks to @RaymondHettinger for the idea of adding sentinels.
Here's what worked for me. I created readers and writers that use a sentinel
'#S'
followed by a data block length at the beginning of each 'record'. The writer has to take care to find any occurrences of'#'
in the data being written and double them (into'##'
). The reader then uses a look-behind regex to find sentinels, distinct from any matching values that might be in the original stream, and also verify the number of bytes between this sentinel and the subsequent one.RecordWriter is a context manager (so multiple calls to write() can be encapsulated into a single record if needed). RecordReader is a generator.
Not sure how this is on performance. Any faster/elegant-er solutions are welcome.