用apache beam(python)解压缩文件,但是当使用WritEtotext时,将所有列作为行

发布于 2025-01-18 19:29:37 字数 766 浏览 1 评论 0原文

我对编程和 Apache Beam 非常陌生,我正在尝试读取 GCS 存储桶上的大量 zip 文件并将其解压缩并在 GCS 上再次保存为 csv。

with beam.Pipeline() as pipeline:
readable_files = (
  pipeline
  | beam.io.fileio.MatchFiles('path/file/patter*.zip')
  | beam.io.fileio.ReadMatches()
  | beam.FlatMap(unzip)
  | beam.combiners.ToList())
files_and_contents = (
  readable_files  
  | beam.io.WriteToText('new', file_name_suffix='.csv'))

我正在使用此功能解压缩文件,

def unzip(readable_file):
print(readable_file)
input_zip=zipfile.ZipFile(readable_file.open())
yield {name: input_zip.read(name) for name in input_zip.namelist()}

我仅使用两个文件对其进行了测试,并且所有行都写为列,这是一个示例。标题是一列,所有其他行都是列。

已保存 CSV 文件

I am very new to programming and Apache Beam, and I am trying to read plenty zip files on a a GCS bucket and unzip them and save again as csv on GCS.

with beam.Pipeline() as pipeline:
readable_files = (
  pipeline
  | beam.io.fileio.MatchFiles('path/file/patter*.zip')
  | beam.io.fileio.ReadMatches()
  | beam.FlatMap(unzip)
  | beam.combiners.ToList())
files_and_contents = (
  readable_files  
  | beam.io.WriteToText('new', file_name_suffix='.csv'))

An I am unzipping the files with this function

def unzip(readable_file):
print(readable_file)
input_zip=zipfile.ZipFile(readable_file.open())
yield {name: input_zip.read(name) for name in input_zip.namelist()}

I have tested it with two files only, and all lines were written as columns, here is an example. The header is a column, and all the other lines columns.

CSV file saved

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

聚集的泪 2025-01-25 19:29:37

内部beam.io.file
io.readMatches()尝试添加skip_header_lines = 1

inside beam.io.file
io.ReadMatches() try adding skip_header_lines=1

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文