如何运行以“做$$”开头的PostgreSQL查询在Python
我有postgresql查询,该查询以:
do
$$
DECLARE
temprow record;
BEGIN
for temprow in
select *
from generate_series(1, 100)
where generate_series % 2 = 0
loop
with cte_input(val) as (select val from (values (temprow.generate_series)) as t(val))
insert
into tmp_table(input_value, value_100)
select cte_input.val as input_value, cte_input.val::float / 100 as value_100
from cte_input;
commit;
end loop;
END
$$ LANGUAGE plpgsql;
如何使用Python和psycopg2运行此查询? 如果我需要在几次动态更改中运行此查询,使用临时功能是正确的吗?
upd
感谢 @erwin-brandstetter提供有关提交的信息。 我从查询块中删除了提交,并将其添加到Python代码中:ps_cursor.execute('commit')。
我以这种方式编写代码:
import concurrent.futures
import psycopg2 as pg
from psycopg2 import pool
features = [(1, name_of_feature_1), ...] # list of features
list_query = []
for feature in features:
feature_id = feature[0]
name_feature = feature[1]
query = f"""--Feature:{feature_id}
create or replace procedure pg_temp.proc_feature_{feature_id}_values()
language plpgsql
as
$$
DECLARE
temprow record;
BEGIN
for temprow in
select *
from tmp_maternal_sample
where maternal_sample = 1000
loop
insert
into tmp_feature_values(feature_id,
feature_values_array,
maternal_sample)
select feature_id,
array_agg(t_rank.{name_feature}) f_values,
temprow.maternal_sample
from t_rank
....
....
end loop;
end
$$;
call pg_temp.proc_feature_{feature_id}_values();
"""
list_query.append(query)
def load_query(query):
ps_connection = threaded_postgreSQL_pool.getconn()
if (ps_connection):
print(f"Successfully recived connection from connection pool for Query {query[:15]} ")
ps_cursor = ps_connection.cursor()
ps_cursor.execute(query)
ps_cursor.execute('COMMIT')
ps_cursor.close()
result = f'Query {query[:15]} finished'
print(result)
return result
try:
threaded_postgreSQL_pool = pool.ThreadedConnectionPool(1, 32, user, password, host, port, database)
if (threaded_postgreSQL_pool):
print("Connection pool created successfully using ThreadedConnectionPool")
with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
future_to_sql = {executor.submit(load_query, query): query for query in list_query}
for future in concurrent.futures.as_completed(future_to_sql):
sql = future_to_sql[future]
try:
data = future.result()
except Exception as exc:
print('%s generated an exception: %s' % (sql[:15], exc))
else:
print('%s page is %s bytes' % (sql[:15], data))
except (Exception, pg.DatabaseError) as error:
print("Error while connecting to PostgreSQL", error)
finally:
if threaded_postgreSQL_pool:
threaded_postgreSQL_pool.closeall
print('Threaded PG connection pool is closed')
I have PostgreSQL query which starts with DO:
do
$
DECLARE
temprow record;
BEGIN
for temprow in
select *
from generate_series(1, 100)
where generate_series % 2 = 0
loop
with cte_input(val) as (select val from (values (temprow.generate_series)) as t(val))
insert
into tmp_table(input_value, value_100)
select cte_input.val as input_value, cte_input.val::float / 100 as value_100
from cte_input;
commit;
end loop;
END
$ LANGUAGE plpgsql;
How I can run this query with Python and psycopg2?
Is it right way to use temporary function if I need to run this query with some dynamic changes few times?
UPD
Thank you @erwin-brandstetter for information about COMMIT.
I deleted COMMIT from query block and add it in Python code: ps_cursor.execute('COMMIT').
I write code in this way:
import concurrent.futures
import psycopg2 as pg
from psycopg2 import pool
features = [(1, name_of_feature_1), ...] # list of features
list_query = []
for feature in features:
feature_id = feature[0]
name_feature = feature[1]
query = f"""--Feature:{feature_id}
create or replace procedure pg_temp.proc_feature_{feature_id}_values()
language plpgsql
as
$
DECLARE
temprow record;
BEGIN
for temprow in
select *
from tmp_maternal_sample
where maternal_sample = 1000
loop
insert
into tmp_feature_values(feature_id,
feature_values_array,
maternal_sample)
select feature_id,
array_agg(t_rank.{name_feature}) f_values,
temprow.maternal_sample
from t_rank
....
....
end loop;
end
$;
call pg_temp.proc_feature_{feature_id}_values();
"""
list_query.append(query)
def load_query(query):
ps_connection = threaded_postgreSQL_pool.getconn()
if (ps_connection):
print(f"Successfully recived connection from connection pool for Query {query[:15]} ")
ps_cursor = ps_connection.cursor()
ps_cursor.execute(query)
ps_cursor.execute('COMMIT')
ps_cursor.close()
result = f'Query {query[:15]} finished'
print(result)
return result
try:
threaded_postgreSQL_pool = pool.ThreadedConnectionPool(1, 32, user, password, host, port, database)
if (threaded_postgreSQL_pool):
print("Connection pool created successfully using ThreadedConnectionPool")
with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
future_to_sql = {executor.submit(load_query, query): query for query in list_query}
for future in concurrent.futures.as_completed(future_to_sql):
sql = future_to_sql[future]
try:
data = future.result()
except Exception as exc:
print('%s generated an exception: %s' % (sql[:15], exc))
else:
print('%s page is %s bytes' % (sql[:15], data))
except (Exception, pg.DatabaseError) as error:
print("Error while connecting to PostgreSQL", error)
finally:
if threaded_postgreSQL_pool:
threaded_postgreSQL_pool.closeall
print('Threaded PG connection pool is closed')
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
假设Postgres 11或更高版本是可以肯定的,因为:
您的
do
语句是无明显原因的。更简单:归结为仅此而已 - 甚至包括创建该临时表:
db< “ nofollow noreferrer”>此处
某些设置(例如dbfiddle.uk)仍然不允许使用
commit
进行交易处理。不确定您是否需要吗?无论哪种方式,只需执行RAW SQL即可。
It's safe to assume Postgres 11 or later, because:
Your
DO
statement is convoluted without obvious reason. Simpler:Which boils down to just this - even including the creation of that temp table:
db<>fiddle here
Some setups (like dbfiddle.uk) still don't allow transaction handling with
COMMIT
. Not sure you even need that?Either way, just execute the raw SQL.