如何运行以“做$$”开头的PostgreSQL查询在Python

发布于 2025-01-19 20:14:12 字数 3654 浏览 4 评论 0原文

我有postgresql查询,该查询以:

do
$$
DECLARE
    temprow record;
BEGIN
    for temprow in
        select *
        from generate_series(1, 100)
        where generate_series % 2 = 0
    loop
        with cte_input(val) as (select val from (values (temprow.generate_series)) as t(val))
        insert
        into tmp_table(input_value, value_100)
        select cte_input.val as input_value, cte_input.val::float / 100 as value_100
        from cte_input;
        commit;
    end loop;
END
$$ LANGUAGE plpgsql;

如何使用Python和psycopg2运行此查询? 如果我需要在几次动态更改中运行此查询,使用临时功能是正确的吗?

upd

感谢 @erwin-brandstetter提供有关提交的信息。 我从查询块中删除了提交,并将其添加到Python代码中:ps_cursor.execute('commit')。

我以这种方式编写代码:


    import concurrent.futures
    import psycopg2 as pg
    from psycopg2 import pool
    
    features = [(1, name_of_feature_1), ...] # list of features
    
    list_query = []
    
    for feature in features:
        feature_id = feature[0]
        name_feature = feature[1]
        query = f"""--Feature:{feature_id}
        create or replace procedure pg_temp.proc_feature_{feature_id}_values()
        language plpgsql
        as
        
    $$
        DECLARE
            temprow record;
        BEGIN
            for temprow in
                select *
                from tmp_maternal_sample
                where maternal_sample = 1000
                loop
                     insert
                     into tmp_feature_values(feature_id, 
                                             feature_values_array,
                                             maternal_sample)
                     select feature_id,
                           array_agg(t_rank.{name_feature}) f_values,
                           temprow.maternal_sample
                     from t_rank
                             ....
                             ....
    
                end loop;
    end
    $$;
    call pg_temp.proc_feature_{feature_id}_values();
    """

        list_query.append(query)
    
    def load_query(query):
        ps_connection = threaded_postgreSQL_pool.getconn()
        if (ps_connection):
            print(f"Successfully recived connection from connection pool for Query {query[:15]} ")
            ps_cursor = ps_connection.cursor()
            ps_cursor.execute(query)
            ps_cursor.execute('COMMIT')
            ps_cursor.close()
            result = f'Query {query[:15]} finished'
            print(result)
            return result
    
    try:
        threaded_postgreSQL_pool = pool.ThreadedConnectionPool(1, 32, user, password, host, port, database)
        if (threaded_postgreSQL_pool):
            print("Connection pool created successfully using ThreadedConnectionPool")
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
            future_to_sql = {executor.submit(load_query, query): query for query in list_query}
            for future in concurrent.futures.as_completed(future_to_sql):
                sql = future_to_sql[future]
                try:
                    data = future.result()
                    
                except Exception as exc:
                    print('%s generated an exception: %s' % (sql[:15], exc))
                else:
                    print('%s page is %s bytes' % (sql[:15], data))
    
    except (Exception, pg.DatabaseError) as error:
        print("Error while connecting to PostgreSQL", error)
    
    finally:
        if threaded_postgreSQL_pool:
            threaded_postgreSQL_pool.closeall
        print('Threaded PG connection pool is closed')

I have PostgreSQL query which starts with DO:

do
$
DECLARE
    temprow record;
BEGIN
    for temprow in
        select *
        from generate_series(1, 100)
        where generate_series % 2 = 0
    loop
        with cte_input(val) as (select val from (values (temprow.generate_series)) as t(val))
        insert
        into tmp_table(input_value, value_100)
        select cte_input.val as input_value, cte_input.val::float / 100 as value_100
        from cte_input;
        commit;
    end loop;
END
$ LANGUAGE plpgsql;

How I can run this query with Python and psycopg2?
Is it right way to use temporary function if I need to run this query with some dynamic changes few times?

UPD

Thank you @erwin-brandstetter for information about COMMIT.
I deleted COMMIT from query block and add it in Python code: ps_cursor.execute('COMMIT').

I write code in this way:


    import concurrent.futures
    import psycopg2 as pg
    from psycopg2 import pool
    
    features = [(1, name_of_feature_1), ...] # list of features
    
    list_query = []
    
    for feature in features:
        feature_id = feature[0]
        name_feature = feature[1]
        query = f"""--Feature:{feature_id}
        create or replace procedure pg_temp.proc_feature_{feature_id}_values()
        language plpgsql
        as
        
    $
        DECLARE
            temprow record;
        BEGIN
            for temprow in
                select *
                from tmp_maternal_sample
                where maternal_sample = 1000
                loop
                     insert
                     into tmp_feature_values(feature_id, 
                                             feature_values_array,
                                             maternal_sample)
                     select feature_id,
                           array_agg(t_rank.{name_feature}) f_values,
                           temprow.maternal_sample
                     from t_rank
                             ....
                             ....
    
                end loop;
    end
    $;
    call pg_temp.proc_feature_{feature_id}_values();
    """

        list_query.append(query)
    
    def load_query(query):
        ps_connection = threaded_postgreSQL_pool.getconn()
        if (ps_connection):
            print(f"Successfully recived connection from connection pool for Query {query[:15]} ")
            ps_cursor = ps_connection.cursor()
            ps_cursor.execute(query)
            ps_cursor.execute('COMMIT')
            ps_cursor.close()
            result = f'Query {query[:15]} finished'
            print(result)
            return result
    
    try:
        threaded_postgreSQL_pool = pool.ThreadedConnectionPool(1, 32, user, password, host, port, database)
        if (threaded_postgreSQL_pool):
            print("Connection pool created successfully using ThreadedConnectionPool")
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
            future_to_sql = {executor.submit(load_query, query): query for query in list_query}
            for future in concurrent.futures.as_completed(future_to_sql):
                sql = future_to_sql[future]
                try:
                    data = future.result()
                    
                except Exception as exc:
                    print('%s generated an exception: %s' % (sql[:15], exc))
                else:
                    print('%s page is %s bytes' % (sql[:15], data))
    
    except (Exception, pg.DatabaseError) as error:
        print("Error while connecting to PostgreSQL", error)
    
    finally:
        if threaded_postgreSQL_pool:
            threaded_postgreSQL_pool.closeall
        print('Threaded PG connection pool is closed')

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

甜点 2025-01-26 20:14:12

假设Postgres 11或更高版本是可以肯定的,因为:

您的do语句是无明显原因的。更简单:

DO
LANGUAGE plpgsql
$do$
DECLARE
   i int;
BEGIN
   FOR i IN
      SELECT generate_series(2, 100, 2)
   LOOP
      INSERT INTO tmp_table(input_value, value_100)
      VALUES (i, i::float / 100);
      -- COMMIT;  -- ?
   END LOOP;
END
$do$;

归结为仅此而已 - 甚至包括创建该临时表:

CREATE TEMP TABLE tmp_table AS
SELECT g AS input_value, g::float / 100 AS value_100
FROM   generate_series(2, 100, 2) g;

db< “ nofollow noreferrer”>此处

某些设置(例如dbfiddle.uk)仍然不允许使用commit进行交易处理。不确定您是否需要吗?

无论哪种方式,只需执行RAW SQL即可。

It's safe to assume Postgres 11 or later, because:

Your DO statement is convoluted without obvious reason. Simpler:

DO
LANGUAGE plpgsql
$do$
DECLARE
   i int;
BEGIN
   FOR i IN
      SELECT generate_series(2, 100, 2)
   LOOP
      INSERT INTO tmp_table(input_value, value_100)
      VALUES (i, i::float / 100);
      -- COMMIT;  -- ?
   END LOOP;
END
$do$;

Which boils down to just this - even including the creation of that temp table:

CREATE TEMP TABLE tmp_table AS
SELECT g AS input_value, g::float / 100 AS value_100
FROM   generate_series(2, 100, 2) g;

db<>fiddle here

Some setups (like dbfiddle.uk) still don't allow transaction handling with COMMIT. Not sure you even need that?

Either way, just execute the raw SQL.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文