并行将示例写入同一tfrecord文件

发布于 2025-02-13 09:19:50 字数 2400 浏览 4 评论 0原文

我有一个过程,该过程读​​取位于云存储中的CSV文件的URI,将数据序列化(一个文件是tensorflow speak中的“示例”),然后将它们写入同一tfrecord文件。

该过程非常慢,我想使用Python多处理并行化写作。我已经搜索了高低,并尝试了多个实现,但无济于事。 这个问题与我非常相似,但是这个问题从未真正回答过。

这是我来的最接近的(不幸的是,由于云存储中的阅读,我无法真正提供一个可复制的示例):

import pandas as pd
import multiprocessing
import tensorflow as TF


TFR_PATH = "./tfr.tfrecord"
BANDS = ["B2", "B3","B4","B5","B6","B7","B8","B8A","B11","B12"]

def write_tfrecord(tfr_path, df_list, bands):
    with tf.io.TFRecordWriter(tfr_path) as writer: 
        for _, grp in df_list:
            band_data = {b: [] for b in bands}
            for i, row in grp.iterrows():
                try:
                    df = pd.read_csv(row['uri'])
                except FileNotFoundError:
                    continue
                df = prepare_df(df, bands)
                label = row['FS_crop'].encode()
                for b in bands:
                    band_data[b].append(list(df[b].astype('Int64')))

            # pad to same length and flatten
            mlen = max([len(j) for j in band_data[list(band_data.keys())[0]]])
            npx = len(band_data[list(band_data.keys())[0]])
            flat_band_data = {k: [] for k in band_data}
            for k,v in band_data.items(): # for each band
                for b in v:
                    flat_band_data[k].extend(b + [0] * int(mlen - len(b)))

            example_proto = serialize_example(npx, flat_band_data, label)
            writer.write(example_proto)

# List of grouped DF object, may be 1000's long
gqdf = list(qdf.groupby("field_centroid_str"))

n = 100 #Groups of files to write 
processes = [multiprocessing.Process(target=write_tfrecord, args=(TFR_PATH, gqdf[i:i+n], BANDS)) for i in range(0, len(gqdf), n)]

for p in processes:
    p.start()

for p in processes:
    p.join()

p.close()

这过程将完成,但是当我去阅读记录时,我总是喜欢:

raw_dataset = tf.data.TFRecordDataset(TFR_PATH)
for raw_record in raw_dataset.take(10):
    example = tf.train.Example()
    example.ParseFromString(raw_record.numpy())
    print(example)

我总是结束使用损坏的数据错误Datalosserror:7462 [OP:iteratorGetNext]有关正确

执行此类操作的正确方法的任何想法吗?我已经尝试使用pool而不是process,但是tf.io.tfrecordwriter无法腌制,因此它不起作用。

I have a process that reads URIs of CSV files located in cloud storage, serializes the data (one file is an "example" in tensorflow speak), and writes them to the same TFRecord file.

The process is very slow and I would like to parallelize the writing using python multiprocessing. I've searched high and low and tried multiple implementations to no avail. This question is very similar to mine, but the question is never really answered.

This is the closest I've come (unfortunately, I can't really provide a replicable example due to the read from cloud storage):

import pandas as pd
import multiprocessing
import tensorflow as TF


TFR_PATH = "./tfr.tfrecord"
BANDS = ["B2", "B3","B4","B5","B6","B7","B8","B8A","B11","B12"]

def write_tfrecord(tfr_path, df_list, bands):
    with tf.io.TFRecordWriter(tfr_path) as writer: 
        for _, grp in df_list:
            band_data = {b: [] for b in bands}
            for i, row in grp.iterrows():
                try:
                    df = pd.read_csv(row['uri'])
                except FileNotFoundError:
                    continue
                df = prepare_df(df, bands)
                label = row['FS_crop'].encode()
                for b in bands:
                    band_data[b].append(list(df[b].astype('Int64')))

            # pad to same length and flatten
            mlen = max([len(j) for j in band_data[list(band_data.keys())[0]]])
            npx = len(band_data[list(band_data.keys())[0]])
            flat_band_data = {k: [] for k in band_data}
            for k,v in band_data.items(): # for each band
                for b in v:
                    flat_band_data[k].extend(b + [0] * int(mlen - len(b)))

            example_proto = serialize_example(npx, flat_band_data, label)
            writer.write(example_proto)

# List of grouped DF object, may be 1000's long
gqdf = list(qdf.groupby("field_centroid_str"))

n = 100 #Groups of files to write 
processes = [multiprocessing.Process(target=write_tfrecord, args=(TFR_PATH, gqdf[i:i+n], BANDS)) for i in range(0, len(gqdf), n)]

for p in processes:
    p.start()

for p in processes:
    p.join()

p.close()

This processes will finish, but when I go to read a record, I like so:

raw_dataset = tf.data.TFRecordDataset(TFR_PATH)
for raw_record in raw_dataset.take(10):
    example = tf.train.Example()
    example.ParseFromString(raw_record.numpy())
    print(example)

I always end up with a corrupted data error DataLossError: corrupted record at 7462 [Op:IteratorGetNext]

Any ideas on the correct approach for doing something like this? I've tried using Pool instead of Process, but the tf.io.TFRecordWriter can't be pickled, so it doesn't work.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

友谊不毕业 2025-02-20 09:19:50

遇到类似的用例。核心问题是记录作者不是过程安全。有两个瓶颈 - 序列化数据并写入输出。我在这里的解决方案是使用多处理(例如一个池)并行序列化数据。每个工人都使用队列将序列化数据传达给单个消费者过程。消费者只需取消队列并顺序写入。如果这是瓶颈,则可以让多个唱片作者写入不同的文件。

Ran into a similar use case. The core issue is that the record writer isn't process-safe. There are two bottlenecks - serializing the data and writing to the output. My solution here was to use multiprocessing (ex. a pool) to serialize the data in parallel. Each worker communicates the serialized data to a single consumer process using a queue. The consumer simply pulls off the queue and writes sequentially. If this is now the bottleneck, you could have multiple record writers write to different files.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文