并行将示例写入同一tfrecord文件
我有一个过程,该过程读取位于云存储中的CSV文件的URI,将数据序列化(一个文件是tensorflow
speak中的“示例”),然后将它们写入同一tfrecord文件。
该过程非常慢,我想使用Python多处理并行化写作。我已经搜索了高低,并尝试了多个实现,但无济于事。 这个问题与我非常相似,但是这个问题从未真正回答过。
这是我来的最接近的(不幸的是,由于云存储中的阅读,我无法真正提供一个可复制的示例):
import pandas as pd
import multiprocessing
import tensorflow as TF
TFR_PATH = "./tfr.tfrecord"
BANDS = ["B2", "B3","B4","B5","B6","B7","B8","B8A","B11","B12"]
def write_tfrecord(tfr_path, df_list, bands):
with tf.io.TFRecordWriter(tfr_path) as writer:
for _, grp in df_list:
band_data = {b: [] for b in bands}
for i, row in grp.iterrows():
try:
df = pd.read_csv(row['uri'])
except FileNotFoundError:
continue
df = prepare_df(df, bands)
label = row['FS_crop'].encode()
for b in bands:
band_data[b].append(list(df[b].astype('Int64')))
# pad to same length and flatten
mlen = max([len(j) for j in band_data[list(band_data.keys())[0]]])
npx = len(band_data[list(band_data.keys())[0]])
flat_band_data = {k: [] for k in band_data}
for k,v in band_data.items(): # for each band
for b in v:
flat_band_data[k].extend(b + [0] * int(mlen - len(b)))
example_proto = serialize_example(npx, flat_band_data, label)
writer.write(example_proto)
# List of grouped DF object, may be 1000's long
gqdf = list(qdf.groupby("field_centroid_str"))
n = 100 #Groups of files to write
processes = [multiprocessing.Process(target=write_tfrecord, args=(TFR_PATH, gqdf[i:i+n], BANDS)) for i in range(0, len(gqdf), n)]
for p in processes:
p.start()
for p in processes:
p.join()
p.close()
这过程将完成,但是当我去阅读记录时,我总是喜欢:
raw_dataset = tf.data.TFRecordDataset(TFR_PATH)
for raw_record in raw_dataset.take(10):
example = tf.train.Example()
example.ParseFromString(raw_record.numpy())
print(example)
我总是结束使用损坏的数据错误Datalosserror:7462 [OP:iteratorGetNext]
有关正确
执行此类操作的正确方法的任何想法吗?我已经尝试使用pool
而不是process
,但是tf.io.tfrecordwriter
无法腌制,因此它不起作用。
I have a process that reads URIs of CSV files located in cloud storage, serializes the data (one file is an "example" in tensorflow
speak), and writes them to the same TFRecord file.
The process is very slow and I would like to parallelize the writing using python multiprocessing. I've searched high and low and tried multiple implementations to no avail. This question is very similar to mine, but the question is never really answered.
This is the closest I've come (unfortunately, I can't really provide a replicable example due to the read from cloud storage):
import pandas as pd
import multiprocessing
import tensorflow as TF
TFR_PATH = "./tfr.tfrecord"
BANDS = ["B2", "B3","B4","B5","B6","B7","B8","B8A","B11","B12"]
def write_tfrecord(tfr_path, df_list, bands):
with tf.io.TFRecordWriter(tfr_path) as writer:
for _, grp in df_list:
band_data = {b: [] for b in bands}
for i, row in grp.iterrows():
try:
df = pd.read_csv(row['uri'])
except FileNotFoundError:
continue
df = prepare_df(df, bands)
label = row['FS_crop'].encode()
for b in bands:
band_data[b].append(list(df[b].astype('Int64')))
# pad to same length and flatten
mlen = max([len(j) for j in band_data[list(band_data.keys())[0]]])
npx = len(band_data[list(band_data.keys())[0]])
flat_band_data = {k: [] for k in band_data}
for k,v in band_data.items(): # for each band
for b in v:
flat_band_data[k].extend(b + [0] * int(mlen - len(b)))
example_proto = serialize_example(npx, flat_band_data, label)
writer.write(example_proto)
# List of grouped DF object, may be 1000's long
gqdf = list(qdf.groupby("field_centroid_str"))
n = 100 #Groups of files to write
processes = [multiprocessing.Process(target=write_tfrecord, args=(TFR_PATH, gqdf[i:i+n], BANDS)) for i in range(0, len(gqdf), n)]
for p in processes:
p.start()
for p in processes:
p.join()
p.close()
This processes will finish, but when I go to read a record, I like so:
raw_dataset = tf.data.TFRecordDataset(TFR_PATH)
for raw_record in raw_dataset.take(10):
example = tf.train.Example()
example.ParseFromString(raw_record.numpy())
print(example)
I always end up with a corrupted data error DataLossError: corrupted record at 7462 [Op:IteratorGetNext]
Any ideas on the correct approach for doing something like this? I've tried using Pool
instead of Process
, but the tf.io.TFRecordWriter
can't be pickled, so it doesn't work.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
遇到类似的用例。核心问题是记录作者不是过程安全。有两个瓶颈 - 序列化数据并写入输出。我在这里的解决方案是使用多处理(例如一个池)并行序列化数据。每个工人都使用队列将序列化数据传达给单个消费者过程。消费者只需取消队列并顺序写入。如果这是瓶颈,则可以让多个唱片作者写入不同的文件。
Ran into a similar use case. The core issue is that the record writer isn't process-safe. There are two bottlenecks - serializing the data and writing to the output. My solution here was to use multiprocessing (ex. a pool) to serialize the data in parallel. Each worker communicates the serialized data to a single consumer process using a queue. The consumer simply pulls off the queue and writes sequentially. If this is now the bottleneck, you could have multiple record writers write to different files.