使用python将文件从FTPS传输到SFTP

发布于 2025-01-10 12:01:54 字数 1586 浏览 4 评论 0原文

我正在做一些性能测试,以将大文件(~ 4 GB)从 FTPS 传输到 SFTP 服务器。 我做了一些研究并尝试了 python 脚本,看看从 FTPS 获取文件并传输到 SFTP 是否有任何性能改进。

FTPS 连接设置

def create_connection(self):
    print('Creating session..........')
    ftp = ftplib.FTP_TLS()
    # ftp.set_debuglevel(2)
    ftp.connect(self.host, self.port)
    ftp.login(self.user, self.passwd)
    ftp.prot_p()
    # optimize socket params for download task
    print('Optimizing socket..........')
    ftp.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    ftp.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
    ftp.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
    print('Session created successfully')
    return ftp

 def get_file(self, ftp_session, dst_filename, local_filename):
    print('Starting download........', datetime.now())
    myfile = BytesIO()
    print(myfile.tell())
    ftp_session.retrbinary('RETR %s' % dst_filename, myfile.write)
    print(myfile.tell())
    print('Download completed ........', datetime.now())

对于 SFTP 连接,我使用的是 paramiko,

host, port = "abc.com", 22
    transport = paramiko.Transport((host, port))
    username, password = "user", "pwd"
    transport.connect(None, username, password)
    transport.default_window_size =  3 * 1024 * 1024
    sftp = paramiko.SFTPClient.from_transport(transport)
    myfile.seek(0)
    sftp.putfo(fl=myfile, remotepath='remotepath/' + local_filename)
    sftp.close()

我使用的是 BytesIO,这样我就可以将文件保存在内存中并在复制时对其进行流式传输。以下代码可以复制文件,但大约需要 20 分钟。该代码首先将文件复制到内存中,然后进行传输。有没有什么方法可以更有效地传输文件?

I am doing some performance test to transfer large files (~ 4 GB) from FTPS to SFTP server.
I did some research and tried python script to see if there is any performance improvement to get a file from FTPS and transfer to SFTP.

FTPS connection setup

def create_connection(self):
    print('Creating session..........')
    ftp = ftplib.FTP_TLS()
    # ftp.set_debuglevel(2)
    ftp.connect(self.host, self.port)
    ftp.login(self.user, self.passwd)
    ftp.prot_p()
    # optimize socket params for download task
    print('Optimizing socket..........')
    ftp.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    ftp.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
    ftp.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
    print('Session created successfully')
    return ftp

 def get_file(self, ftp_session, dst_filename, local_filename):
    print('Starting download........', datetime.now())
    myfile = BytesIO()
    print(myfile.tell())
    ftp_session.retrbinary('RETR %s' % dst_filename, myfile.write)
    print(myfile.tell())
    print('Download completed ........', datetime.now())

For SFTP connection I am using paramiko

host, port = "abc.com", 22
    transport = paramiko.Transport((host, port))
    username, password = "user", "pwd"
    transport.connect(None, username, password)
    transport.default_window_size =  3 * 1024 * 1024
    sftp = paramiko.SFTPClient.from_transport(transport)
    myfile.seek(0)
    sftp.putfo(fl=myfile, remotepath='remotepath/' + local_filename)
    sftp.close()

I am using BytesIO so that I can keep the file in memory and stream it while copying. The following code can copy the file but it is taking ~ 20 mins. The code is first copy the file in memory and then its transferring. Is there any possible way to transfer file more efficiently ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

飘过的浮云 2025-01-17 12:01:54

经过一些简短的谷歌搜索(是的,谷歌确实有效=)我偶然发现了这个线程:

Paramiko 无法下载大于 1GB 的大文件

import threading, os, time, paramiko

#you could make the number of threads relative to file size
NUM_THREADS = 4
MAX_RETRIES = 10

def make_filepart_path(file_path, part_number):
    """creates filepart path from filepath"""
    return "%s.filepart.%s" % (file_path, part_number+1)

def write_chunks(chunks, tnum, local_file_part, username, password, ftp_server, max_retries):
    ssh_conn = sftp_client = None
    for retry in range(max_retries):
        try:
            ssh_conn = paramiko.Transport((ftp_server, port))
            ssh_conn.connect(username=username, password=password)
            sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
            with sftp_client.open(sftp_file, "rb") as infile:
                with open(local_file_part, "wb") as outfile:
                    for chunk in infile.readv(chunks):
                        outfile.write(chunk)
            break
        except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
            retry += 1
            print("%s %s Thread %s - > retrying %s..." % (type(x), x, tnum, retry))
            time.sleep(abs(retry) * 10)
        finally:
            if hasattr(sftp_client, "close") and callable(sftp_client.close):
                sftp_client.close()
            if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
                ssh_conn.close()



start_time = time.time()

for retry in range(MAX_RETRIES):
    try:
        ssh_conn = paramiko.Transport((ftp_server, port))
        ssh_conn.connect(username=username, password=password)
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
        # connect to get the file's size in order to calculate chunks
        filesize = sftp_client.stat(sftp_file).st_size
        sftp_client.close()
        ssh_conn.close()
        chunksize = pow(4, 12)
        chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
        thread_chunk_size = (len(chunks) // NUM_THREADS) + 1
        # break the chunks into sub lists to hand off to threads
        thread_chunks = [chunks[i:i+thread_chunk_size] for i in range(0, len(chunks) - 1, thread_chunk_size)]
        threads = []
        fileparts = []
        for thread_num in range(len(thread_chunks)):
            local_file_part = make_filepart_path(local_file, thread_num) 
            args = (thread_chunks[thread_num], thread_num, local_file_part, username, password, ftp_server, MAX_RETRIES)
            threads.append(threading.Thread(target=write_chunks, args=args))
            fileparts.append(local_file_part)
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
        # join file parts into one file, remove fileparts
        with open(local_file, "wb") as outfile:
            for filepart in fileparts:
                with open(filepart, "rb") as infile:
                    outfile.write(infile.read())
                os.remove(filepart)
        break
    except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
        retry += 1
        print("%s %s - > retrying %s..." % (type(x), x, retry))
        time.sleep(abs(retry) * 10)
    finally:
       if hasattr(sftp_client, "close") and callable(sftp_client.close):
           sftp_client.close()
       if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
           ssh_conn.close()


print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))

After some short google searches (yes, google does really work=) I have stumbled across this thread:

Paramiko Fails to download large files >1GB

import threading, os, time, paramiko

#you could make the number of threads relative to file size
NUM_THREADS = 4
MAX_RETRIES = 10

def make_filepart_path(file_path, part_number):
    """creates filepart path from filepath"""
    return "%s.filepart.%s" % (file_path, part_number+1)

def write_chunks(chunks, tnum, local_file_part, username, password, ftp_server, max_retries):
    ssh_conn = sftp_client = None
    for retry in range(max_retries):
        try:
            ssh_conn = paramiko.Transport((ftp_server, port))
            ssh_conn.connect(username=username, password=password)
            sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
            with sftp_client.open(sftp_file, "rb") as infile:
                with open(local_file_part, "wb") as outfile:
                    for chunk in infile.readv(chunks):
                        outfile.write(chunk)
            break
        except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
            retry += 1
            print("%s %s Thread %s - > retrying %s..." % (type(x), x, tnum, retry))
            time.sleep(abs(retry) * 10)
        finally:
            if hasattr(sftp_client, "close") and callable(sftp_client.close):
                sftp_client.close()
            if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
                ssh_conn.close()



start_time = time.time()

for retry in range(MAX_RETRIES):
    try:
        ssh_conn = paramiko.Transport((ftp_server, port))
        ssh_conn.connect(username=username, password=password)
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
        # connect to get the file's size in order to calculate chunks
        filesize = sftp_client.stat(sftp_file).st_size
        sftp_client.close()
        ssh_conn.close()
        chunksize = pow(4, 12)
        chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
        thread_chunk_size = (len(chunks) // NUM_THREADS) + 1
        # break the chunks into sub lists to hand off to threads
        thread_chunks = [chunks[i:i+thread_chunk_size] for i in range(0, len(chunks) - 1, thread_chunk_size)]
        threads = []
        fileparts = []
        for thread_num in range(len(thread_chunks)):
            local_file_part = make_filepart_path(local_file, thread_num) 
            args = (thread_chunks[thread_num], thread_num, local_file_part, username, password, ftp_server, MAX_RETRIES)
            threads.append(threading.Thread(target=write_chunks, args=args))
            fileparts.append(local_file_part)
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
        # join file parts into one file, remove fileparts
        with open(local_file, "wb") as outfile:
            for filepart in fileparts:
                with open(filepart, "rb") as infile:
                    outfile.write(infile.read())
                os.remove(filepart)
        break
    except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
        retry += 1
        print("%s %s - > retrying %s..." % (type(x), x, retry))
        time.sleep(abs(retry) * 10)
    finally:
       if hasattr(sftp_client, "close") and callable(sftp_client.close):
           sftp_client.close()
       if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
           ssh_conn.close()


print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文