在Apache Beam Python中读取CSV的有效方法

发布于 2025-02-03 19:45:05 字数 1025 浏览 7 评论 0原文

在阅读了有关Stackoverflow的一些问题之后,我一直在使用以下代码来读取Beam上的CSV文件。

管道代码:

 with beam.Pipeline(options=pipeline_options) as p:

    parsed_csv = (p | 'Create from CSV' >> beam.Create([input_file]))
    flattened_file = (parsed_csv | 'Flatten the CSV' >> beam.FlatMap(get_csv_reader))

读取csv 的方法:get_csv_reader()

def get_csv_reader(readable_file):

    # Open a channel to read the file from GCS
    gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

    # Read file as a CSV
    gcs_reader = csv.reader(io.TextIOWrapper(gcs_file))

    next(gcs_reader)

    return gcs_reader

我正在使用它而不是readfromText,因为当字段值中有newline字符时,它会失败。

问题: 现在,我的问题是,这种阅读CSV的方式是否有效?如果庞大的文件,它会失败吗?我之所以问,是因为我在方法中使用了 csv.reader 。我觉得这将文件加载到内存中,导致大量文件失败。如果我错了,请纠正我的理解。

此外,由于这是一个Ptransform,我的方法是否会被序列化以在不同的工作节点上运行?我对Beam将如何在幕后运行此代码感到困惑。

如果这不是有效的,请建议在Apache Beam上读取CSV的有效方法。

After reading some questions on StackOverflow, I have been using the below code to read CSV files on beam.

Pipeline code:

 with beam.Pipeline(options=pipeline_options) as p:

    parsed_csv = (p | 'Create from CSV' >> beam.Create([input_file]))
    flattened_file = (parsed_csv | 'Flatten the CSV' >> beam.FlatMap(get_csv_reader))

Method to read csv: get_csv_reader()

def get_csv_reader(readable_file):

    # Open a channel to read the file from GCS
    gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

    # Read file as a CSV
    gcs_reader = csv.reader(io.TextIOWrapper(gcs_file))

    next(gcs_reader)

    return gcs_reader

I am using this as opposed to ReadFromText because it fails when there are newline characters in the field values.

Question:
Now, my question is if this way of reading CSV is efficient? Would it fail in case of huge files? I ask because I am using csv.reader in my method. I feel like this loads the file into memory causing a failure for huge files. Please correct my understanding if I am wrong.

Additionally, since this is a Ptransform will my method be serialized to run on different worker nodes? I am confused as to how beam would run this code behind the scenes.

If this is not the efficient please suggest the efficient way to read CSV on apache beam.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

写下不归期 2025-02-10 19:45:05

您可以定义生成器,以懒惰地逐行读取文件。

def read_csv_file(readable_file):
  with beam.io.filesystems.FileSystems.open(readable_file) as gcs_file:
    for row in csv.reader(gcs_file):
      yield row

一个类似的问题是如何在加载newlines时如何处理Newlines apache梁的CSV?

You can define a generator to lazily read the files row by row.

def read_csv_file(readable_file):
  with beam.io.filesystems.FileSystems.open(readable_file) as gcs_file:
    for row in csv.reader(gcs_file):
      yield row

A similar question is How to handle newlines when loading a CSV into Apache Beam?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文