创建和加载avro文件,创建文件,但很空

发布于 01-22 02:36 字数 671 浏览 5 评论 0原文

我正在读取CSV文件并将其加载到GCS存储桶中的AVRO文件中。 AVRO文件会创建,但没有数据。打印时有数据。我检查了缓冲区,但缓冲区中也没有数据。

我尝试了writer.close(),但是我遇到了这个错误 - “如果不结束上传,就无法齐平。使用colle(),” io.unsupportedoperation:如果不结束上传,就无法冲洗。而是使用Close()。”

'def load_avro_file(records):
schema_parsed = avro.schema.parse(json.dumps(schema))
client = storage.Client()
bucket = client.get_bucket(BUCKET)
blob = bucket.blob(DESTINATION_FILE)
with blob.open(mode='wb') as f:
    writer = DataFileWriter(f, DatumWriter(), schema_parsed)
    for record in records:
        record = dict((f, getattr(record, f)) for f in record._fields)
        print("In here",record)
        writer.append(record)

''

I am reading a CSV file and loading it into an Avro file in the GCS bucket. The Avro file gets created but there is no data. There is data when I print. I checked the buffer but there is no data in the buffer as well.

I tried writer.close() but I am getting this error - "Cannot flush without finalizing upload. Use close() instead, "io.UnsupportedOperation: Cannot flush without finalizing upload. Use close() instead."

'def load_avro_file(records):
schema_parsed = avro.schema.parse(json.dumps(schema))
client = storage.Client()
bucket = client.get_bucket(BUCKET)
blob = bucket.blob(DESTINATION_FILE)
with blob.open(mode='wb') as f:
    writer = DataFileWriter(f, DatumWriter(), schema_parsed)
    for record in records:
        record = dict((f, getattr(record, f)) for f in record._fields)
        print("In here",record)
        writer.append(record)

'

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

痴情2025-01-29 02:36:03

我面临类似的问题,但找不到任何答案。也许您已经解决了这个问题,但是让我在这里分享我的工作方式。

阅读 blob.open 方法,我发现此image> image_flush参数:

(可选)对于非文本模式写入,使flush()无需做任何事情而不是提出错误。远程服务不支持flush()没有关闭的情况,因此通常将其称为io.unsupporteDoperation。但是,这种行为与python中的一些消费者和文件包装器不兼容,例如zipfile.zipfile或io.textiowrapper。设置ignore_flush将导致flush()成功地做任何事情,以兼容这些上下文。实际刷新到远程服务器的正确方法是关闭(使用上下文管理器,例如在示例中,将自动发生这种情况)。

AVRO需要在二进制模式下打开文件,因此打开BLOB时,我们需要将此参数设置为true以避免错误。

另外,如果您不调用.close() avro方法,则文件无法正确生成,因此我们需要将io对象馈送给作者,而无需将其包裹在上下文管理器上它将由Avro本身处理。

最终解决方案看起来像这样:

import google.cloud.storage as gcs
from avro.datafile import DataFileWriter
from avro.io import DatumWriter

gcs_client = gcs.Client()
bucket = gcs_client.bucket(bucketname)
blob = bucket.blob(filename)

writer = DataFileWriter(blob.open('wb', ignore_flush=True), DatumWriter(), schema_parsed)
for record in records:
    writer.append(record)
writer.close()

I was facing a similar problem but couldn't find any answer for this. Maybe you already solved this, but let me share here how I had this working.

Reading Google Cloud docs for blob.open method, I found this ignore_flush parameter:

(Optional) For non text-mode writes, makes flush() do nothing instead of raising an error. flush() without closing is not supported by the remote service and therefore calling it normally results in io.UnsupportedOperation. However, that behavior is incompatible with some consumers and wrappers of file objects in Python, such as zipfile.ZipFile or io.TextIOWrapper. Setting ignore_flush will cause flush() to successfully do nothing, for compatibility with those contexts. The correct way to actually flush data to the remote server is to close() (using a context manager, such as in the example, will cause this to happen automatically).

Avro needs to open the files on binary mode, so when opening the blob we need to set this parameter to True to avoid errors.

Also, if you don't call the .close() avro method the file won't be generated properly, so we need to feed our IO object to the writer without wrapping it on a context manager as it will be handled by avro itself.

The final solution looks like this:

import google.cloud.storage as gcs
from avro.datafile import DataFileWriter
from avro.io import DatumWriter

gcs_client = gcs.Client()
bucket = gcs_client.bucket(bucketname)
blob = bucket.blob(filename)

writer = DataFileWriter(blob.open('wb', ignore_flush=True), DatumWriter(), schema_parsed)
for record in records:
    writer.append(record)
writer.close()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文