如何使用 boto 将文件从 Amazon S3 流式传输到 Rackspace Cloudfiles?

发布于 2024-12-07 18:32:23 字数 252 浏览 2 评论 0原文

我正在将文件从 S3 复制到 Cloudfiles,并且我想避免将该文件写入磁盘。 Python-Cloudfiles 库有一个 object.stream() 调用,看起来正是我所需要的,但我在 boto 中找不到等效的调用。我希望我能够做这样的事情:

shutil.copyfileobj(s3Object.stream(),rsObject.stream())

这可以用 boto (或者我想任何其他 s3 库)吗?

I'm copying a file from S3 to Cloudfiles, and I would like to avoid writing the file to disk. The Python-Cloudfiles library has an object.stream() call that looks to be what I need, but I can't find an equivalent call in boto. I'm hoping that I would be able to do something like:

shutil.copyfileobj(s3Object.stream(),rsObject.stream())

Is this possible with boto (or I suppose any other s3 library)?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

何以畏孤独 2024-12-14 18:32:23

该线程中的其他答案与 boto 相关,但 S3.Object 在 boto3 中不再可迭代。因此,以下代码不起作用,它会产生 TypeError: 's3.Object' object is not iterable 错误消息:

s3 = boto3.session.Session(profile_name=my_profile).resource('s3')
s3_obj = s3.Object(bucket_name=my_bucket, key=my_key)

with io.FileIO('sample.txt', 'w') as file:
    for i in s3_obj:
        file.write(i)

在 boto3 中,对象的内容可在 S3.Object 中获得。 get()['Body'] 这是一个可迭代的版本 1.9.68 但以前不是。因此,以下内容适用于最新版本的 boto3,但不适用于早期版本:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for i in body:
        file.write(i)

因此,较旧的 bo​​to3 版本的替代方法是使用 read 方法,但这会将整个 S3 对象加载到内存中,在处理大文件时,这并不总是可能性:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for i in body.read():
        file.write(i)

但是 read 方法允许传入 amt 参数,指定我们想要从底层流读取的字节数。可以重复调用此方法,直到读取整个流:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    while file.write(body.read(amt=512)):
        pass

深入研究 botocore.response.StreamingBody 代码,我们意识到底层流也是可用的,因此我们可以进行如下迭代:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for b in body._raw_stream:
        file.write(b)

在谷歌搜索时,我'我还看到了一些可以使用的链接,但我还没有尝试过:

Other answers in this thread are related to boto, but S3.Object is not iterable anymore in boto3. So, the following DOES NOT WORK, it produces an TypeError: 's3.Object' object is not iterable error message:

s3 = boto3.session.Session(profile_name=my_profile).resource('s3')
s3_obj = s3.Object(bucket_name=my_bucket, key=my_key)

with io.FileIO('sample.txt', 'w') as file:
    for i in s3_obj:
        file.write(i)

In boto3, the contents of the object is available at S3.Object.get()['Body'] which is an iterable since version 1.9.68 but previously wasn't. Thus the following will work for the latest versions of boto3 but not earlier ones:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for i in body:
        file.write(i)

So, an alternative for older boto3 versions is to use the read method, but this loads the WHOLE S3 object in memory which when dealing with large files is not always a possibility:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for i in body.read():
        file.write(i)

But the read method allows to pass in the amt parameter specifying the number of bytes we want to read from the underlying stream. This method can be repeatedly called until the whole stream has been read:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    while file.write(body.read(amt=512)):
        pass

Digging into botocore.response.StreamingBody code one realizes that the underlying stream is also available, so we could iterate as follows:

body = s3_obj.get()['Body']
with io.FileIO('sample.txt', 'w') as file:
    for b in body._raw_stream:
        file.write(b)

While googling I've also seen some links that could be use, but I haven't tried:

¢好甜 2024-12-14 18:32:23

boto 中的 Key 对象代表 S3 中的对象,可以像迭代器一样使用,因此您应该能够执行如下操作:

>>> import boto
>>> c = boto.connect_s3()
>>> bucket = c.lookup('garnaat_pub')
>>> key = bucket.lookup('Scan1.jpg')
>>> for bytes in key:
...   write bytes to output stream

或者,如您的示例所示,您可以执行以下操作:

>>> shutil.copyfileobj(key, rsObject.stream())

The Key object in boto, which represents on object in S3, can be used like an iterator so you should be able to do something like this:

>>> import boto
>>> c = boto.connect_s3()
>>> bucket = c.lookup('garnaat_pub')
>>> key = bucket.lookup('Scan1.jpg')
>>> for bytes in key:
...   write bytes to output stream

Or, as in the case of your example, you could do:

>>> shutil.copyfileobj(key, rsObject.stream())
凝望流年 2024-12-14 18:32:23

我想至少有一些看到这个问题的人会像我一样,并且想要一种从 boto 逐行(或逐个逗号,或任何其他分隔符)流式传输文件的方法。这是一个简单的方法:

def getS3ResultsAsIterator(self, aws_access_info, key, prefix):        
    s3_conn = S3Connection(**aws_access)
    bucket_obj = s3_conn.get_bucket(key)
    # go through the list of files in the key
    for f in bucket_obj.list(prefix=prefix):
        unfinished_line = ''
        for byte in f:
            byte = unfinished_line + byte
            #split on whatever, or use a regex with re.split()
            lines = byte.split('\n')
            unfinished_line = lines.pop()
            for line in lines:
                yield line

上面@garnaat的答案仍然很棒并且100%真实。希望我的仍然可以帮助别人。

I figure at least some of the people seeing this question will be like me, and will want a way to stream a file from boto line by line (or comma by comma, or any other delimiter). Here's a simple way to do that:

def getS3ResultsAsIterator(self, aws_access_info, key, prefix):        
    s3_conn = S3Connection(**aws_access)
    bucket_obj = s3_conn.get_bucket(key)
    # go through the list of files in the key
    for f in bucket_obj.list(prefix=prefix):
        unfinished_line = ''
        for byte in f:
            byte = unfinished_line + byte
            #split on whatever, or use a regex with re.split()
            lines = byte.split('\n')
            unfinished_line = lines.pop()
            for line in lines:
                yield line

@garnaat's answer above is still great and 100% true. Hopefully mine still helps someone out.

夏天碎花小短裙 2024-12-14 18:32:23

更新的答案:

S3Fs 似乎是一个更好的解决方案:

import s3fs

s3file = s3fs.S3FileSystem().open(f'{bucket}/{key}', 'rb')

# Then any of these are valid:

textfile = TextIOWrapper(s3file)
...
zipfile = ZipFile(s3file)
...
gzipfile = gzip.open(s3file, mode='rt')
...
csvfile = csv.reader(s3file)
...

旧答案:

Botocore 的 StreamingBody 有一个 iter_lines() 方法:

https://botocore.amazonaws。 com/v1/documentation/api/latest/reference/response.html#botocore.response.StreamingBody.iter_lines

所以:

import boto3
s3r = boto3.resource('s3')
iterator = s3r.Object(bucket, key).get()['Body'].iter_lines()

for line in iterator:
    print(line)

Updated answer:

S3Fs appears to be a better solution:

import s3fs

s3file = s3fs.S3FileSystem().open(f'{bucket}/{key}', 'rb')

# Then any of these are valid:

textfile = TextIOWrapper(s3file)
...
zipfile = ZipFile(s3file)
...
gzipfile = gzip.open(s3file, mode='rt')
...
csvfile = csv.reader(s3file)
...

Old answer:

Botocore's StreamingBody has an iter_lines() method:

https://botocore.amazonaws.com/v1/documentation/api/latest/reference/response.html#botocore.response.StreamingBody.iter_lines

So:

import boto3
s3r = boto3.resource('s3')
iterator = s3r.Object(bucket, key).get()['Body'].iter_lines()

for line in iterator:
    print(line)
不奢求什么 2024-12-14 18:32:23

这是我包装流媒体主体的解决方案:

import io
class S3ObjectInterator(io.RawIOBase):
    def __init__(self, bucket, key):
        """Initialize with S3 bucket and key names"""
        self.s3c = boto3.client('s3')
        self.obj_stream = self.s3c.get_object(Bucket=bucket, Key=key)['Body']

    def read(self, n=-1):
        """Read from the stream"""
        return self.obj_stream.read() if n == -1 else self.obj_stream.read(n)

示例用法:

obj_stream = S3ObjectInterator(bucket, key)
for line in obj_stream:
    print line

This is my solution of wrapping streaming body:

import io
class S3ObjectInterator(io.RawIOBase):
    def __init__(self, bucket, key):
        """Initialize with S3 bucket and key names"""
        self.s3c = boto3.client('s3')
        self.obj_stream = self.s3c.get_object(Bucket=bucket, Key=key)['Body']

    def read(self, n=-1):
        """Read from the stream"""
        return self.obj_stream.read() if n == -1 else self.obj_stream.read(n)

Example usage:

obj_stream = S3ObjectInterator(bucket, key)
for line in obj_stream:
    print line
情绪失控 2024-12-14 18:32:23

如果您愿意接受其他选项,smart_open 是一个用于在 Python 中传输大文件的实用程序,并且它使工作变得极其容易。

以下是两个示例:

import boto3
from smart_open import open

session = boto3.Session(
    aws_access_key_id="xxx",
    aws_secret_access_key="xxx",
)
client = session.client('s3')

for line in open(
    "s3://my-bucket/my-file.txt",
    transport_params=dict(client=client),
):
    print(line)

对于压缩文件:

import boto3
from smart_open import open

session = boto3.Session(
    aws_access_key_id="xxx",
    aws_secret_access_key="xxx",
)
client = session.client('s3')

for line in open(
    "s3://my-bucket/my-file.txt.gz",
    encoding="utf-8",
    transport_params=dict(client=client),
):
    print(line)

If you are open to other options, smart_open is a utility for streaming large files in Python, and it makes work extremely easy.

Here are two examples:

import boto3
from smart_open import open

session = boto3.Session(
    aws_access_key_id="xxx",
    aws_secret_access_key="xxx",
)
client = session.client('s3')

for line in open(
    "s3://my-bucket/my-file.txt",
    transport_params=dict(client=client),
):
    print(line)

For compressed file:

import boto3
from smart_open import open

session = boto3.Session(
    aws_access_key_id="xxx",
    aws_secret_access_key="xxx",
)
client = session.client('s3')

for line in open(
    "s3://my-bucket/my-file.txt.gz",
    encoding="utf-8",
    transport_params=dict(client=client),
):
    print(line)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文