使用 psycopg2 在 python 中管道 postgres COPY

发布于 2024-11-25 20:41:13 字数 743 浏览 1 评论 0原文

我正在编写一个脚本,使用 psycopg2 在同一网络上的两台机器之间复制一些数据。我正在替换一些旧的、丑陋的 bash 来进行复制,

psql -c -h remote.host "COPY table TO STDOUT" | psql -c "COPY table FROM STDIN"

这似乎是最简单的 最有效的 复制方法。使用 stringIO 或临时文件在 python 中复制很容易,如下所示:

buf = StringIO()

from_curs   = from_conn.cursor()
to_curs     = to_conn.cursor()

from_curs.copy_expert("COPY table TO STDOUT", buf)
buf.seek(0, os.SEEK_SET)
to_curs.copy_expert("COPY table FROM STDIN", buf)

...但这涉及将所有数据保存到磁盘/内存中。

有没有人想出一种方法来在这样的副本中模仿 Unix 管道的行为?我似乎找不到不涉及 POpen 的 unix-pipe 对象 - 毕竟,也许最好的解决方案就是只使用 POpen 和子进程。

I'm writing a script to do a copy of some data between two machines on the same network using psycopg2. I'm replacing some old, ugly bash that does the copy with

psql -c -h remote.host "COPY table TO STDOUT" | psql -c "COPY table FROM STDIN"

This seems like both the simplest and most efficient way to do the copy. It's easy to replicate in python with a stringIO or a temp-file, like so:

buf = StringIO()

from_curs   = from_conn.cursor()
to_curs     = to_conn.cursor()

from_curs.copy_expert("COPY table TO STDOUT", buf)
buf.seek(0, os.SEEK_SET)
to_curs.copy_expert("COPY table FROM STDIN", buf)

...but that involves saving all the data to disk/in memory.

Has anyone figured out a way to mimic the behavior of a Unix pipe in a copy like this? I can't seem to find a unix-pipe object that doesn't involve POpen - Maybe the best solution is to just use POpen and subprocess, after all.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

吲‖鸣 2024-12-02 20:41:13

您必须将其中一个调用放在一个单独的线程中。我刚刚意识到你可以使用 os.pipe() ,这使得其余的非常简单:

#!/usr/bin/python
import psycopg2
import os
import threading

fromdb = psycopg2.connect("dbname=from_db")
todb = psycopg2.connect("dbname=to_db")

r_fd, w_fd = os.pipe()

def copy_from():
    cur = todb.cursor()
    cur.copy_from(os.fdopen(r_fd), 'table')
    cur.close()
    todb.commit()

to_thread = threading.Thread(target=copy_from)
to_thread.start()

cur = fromdb.cursor()
write_f = os.fdopen(w_fd, 'w')
cur.copy_to(write_f, 'table')
write_f.close()   # or deadlock...

to_thread.join()

You will have to put one of your calls in a separate thread. I just realized you can use os.pipe(), which makes the rest quite straightforward:

#!/usr/bin/python
import psycopg2
import os
import threading

fromdb = psycopg2.connect("dbname=from_db")
todb = psycopg2.connect("dbname=to_db")

r_fd, w_fd = os.pipe()

def copy_from():
    cur = todb.cursor()
    cur.copy_from(os.fdopen(r_fd), 'table')
    cur.close()
    todb.commit()

to_thread = threading.Thread(target=copy_from)
to_thread.start()

cur = fromdb.cursor()
write_f = os.fdopen(w_fd, 'w')
cur.copy_to(write_f, 'table')
write_f.close()   # or deadlock...

to_thread.join()
吝吻 2024-12-02 20:41:13

您可以使用已子类化的双端队列来支持读取和写入:

from collections import deque
from Exceptions import IndexError

class DequeBuffer(deque):
    def write(self, data):
        self.append(data)
    def read(self):
        try:
            return self.popleft()
        except IndexError:
            return ''

buf = DequeBuffer()

如果读取器比写入器快得多,并且表很大,则双端队列仍然会变大,但它会比存储整个东西要小。

另外,我不确定当 deque 为空时 return '' 是否安全,而不是重试直到它不为空,但我猜是这样。让我知道它是否有效。

当您确定复制完成时,请记住del buf,特别是如果脚本此时不只是退出的话。

You could use a deque that you've subclassed to support reading and writing:

from collections import deque
from Exceptions import IndexError

class DequeBuffer(deque):
    def write(self, data):
        self.append(data)
    def read(self):
        try:
            return self.popleft()
        except IndexError:
            return ''

buf = DequeBuffer()

If the reader is much faster than the writer, and the table is large, the deque will still get big, but it will be smaller than storing the whole thing.

Also, I don't know for sure return '' when the deque is empty is safe, rather than retrying until it's not empty, but I'd guess it is. Let me know if it works.

Remember to del buf when you're sure the copy is done, especially if the script isn't just exiting at that point.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文