cx-oracle 中的并行执行

发布于 2024-12-06 04:54:49 字数 1004 浏览 2 评论 0原文

我最近加入了一家新公司,并且是 python(他们首选的脚本语言)的新手,并且一直在使用 cx_oracle 创建一些 ETL 流程。到目前为止,我构建的脚本都是单线程作业,它们从 Oracle 源数据库中选择我需要的列子集,并将输出写入命名管道,其中外部进程正在等待读取该数据并将其插入目标中。

这一直工作得很好,直到我到达一些 5 亿 -20 亿行范围内的表。这项工作仍然有效,但需要很多小时才能完成。这些大型源表是分区的,因此我一直在尝试研究协调不同分区的并行读取的方法,以便我可以让两个或更多线程同时工作,每个线程写入一个单独的命名管道。

cx-oracle中是否有一种优雅的方式来处理从同一个表的不同分区读取的多个线程?

这是我当前的(简单)代码:

import cx_Oracle
import csv

# connect via SQL*Net string or by each segment in a separate argument
connection = cx_Oracle.connect("user/password@TNS")


csv.register_dialect('pipe_delimited', escapechar='\\' delimiter='|',quoting=csv.QUOTE_NONE)

cursor = connection.cursor()
f = open("<path_to_named_pipe>", "w")

writer = csv.writer(f, dialect='pipe_delimited', lineterminator="\n")
r = cursor.execute("""SELECT <column_list> from <SOURCE_TABLE>""")
for row in cursor:
        writer.writerow(row)
f.close()

我的一些源表有超过 1000 个分区,因此对分区名称进行硬编码并不是首选选项。我一直在考虑设置分区名称数组并迭代它们,但如果人们有其他想法,我很乐意听到他们。

I have recently joined a new company and am new to python (their preferred scripting language) and have been working with cx_oracle to create some ETL processes. The scripts I have built so far have been single-threaded jobs that select the subset of columns I need from an Oracle source DB and write the output to a named pipe where an external process is waiting to read that data and insert it into the target.

This has worked fine until I get to some tables that are in the 500 million -2 billion row range. The job still works, but it is taking many hours to complete. These large source tables are partitioned so I have been trying to research ways to coordinate parallel reads of different partitions so I can get two or more threads working concurrently, each writing to a separate named pipe.

Is there an elegant way in cx-oracle to handle multiple threads reading from different partitions of the same table?

Here's my current (simple) code:

import cx_Oracle
import csv

# connect via SQL*Net string or by each segment in a separate argument
connection = cx_Oracle.connect("user/password@TNS")


csv.register_dialect('pipe_delimited', escapechar='\\' delimiter='|',quoting=csv.QUOTE_NONE)

cursor = connection.cursor()
f = open("<path_to_named_pipe>", "w")

writer = csv.writer(f, dialect='pipe_delimited', lineterminator="\n")
r = cursor.execute("""SELECT <column_list> from <SOURCE_TABLE>""")
for row in cursor:
        writer.writerow(row)
f.close()

Some of my source tables have over 1000 partitions so hard-coding the partition names in isn't the preferred option. I have been thinking about setting up arrays of partition names and iterating through them, but if folks have other ideas I'd love to hear them.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

怪我入戏太深 2024-12-13 04:54:49

首先,您需要确保 *cx_Oracle* 是线程安全的。由于它实现了 Python DB API 规范 v2.0,因此您只需要做的是检查全局的threadsafety模块。
23 表示您可以打开与数据库的多个连接并同时运行多个查询。最好的方法是使用 threading 模块,该模块非常易于使用。 这是一篇简短而精彩的文章,介绍如何开始使用它。

当然,不能保证管道化查询会带来显着的性能提升(数据库引擎、I/O 等原因),但这绝对值得一试。祝你好运!

First of all, you need to make sure that *cx_Oracle* is thread-safe. Since it implements the Python DB API Spec v2.0, all you need to do is check the threadsafety module global.
Values 2 or 3 mean that you can open multiple connections to the DB and run multiple queries at the same time. The best way to do this is to use the threading module, which is pretty easy to use. This is a short and sweet article on how to get started with it.

Of course, there are no guarantees that pipelining your queries will result in a significant performance gains (DB engine, I/O, etc. reasons) but it's definitely worth the try. Good luck!

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文