psycopg2 AttributeError:' Cursor'对象没有属性' mogrify'

发布于 2025-01-21 01:36:06 字数 4105 浏览 3 评论 0 原文

我有一个课程,可以帮助我在Postgres中进行SQL查询和插入。我现在正在使用 psycopg2 == 2.7.5 。我正在使用的方法之一:

import pandas as pd    
import psycopg2.extensions as ps_ext
from typing import List

def insert_with_open_connection(self, df: pd.DataFrame, table_name: str, cursor: ps_ext.cursor,
                                conn: ps_ext.connection,
                                success_msg: str = 'Success',
                                conflict_cols: List[str] = None):
    try:
        # Format the INSERT SQL query
        cols = str(tuple(df.columns)).replace("'", '')
        nc = df.shape[1]
        ss = "(" + ''.join('%s,' for _ in range(nc))[:-1] + ")"
        try:
            args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
        except psycopg2.ProgrammingError:
            args_str = str(b','.join(cursor.mogrify(ss, x) for x in self.clean_numpy_int_for_mogrify(df.values)),
                           'utf-8')
        args_str = args_str.replace("\'NaN\'::float", 'NULL')
        insert_sql = f'INSERT INTO {table_name} {cols} VALUES {args_str}'
        if conflict_cols is not None:
            conf_cols = str(tuple(conflict_cols)).replace("'", '').replace(',)', ')')
            insert_sql += f"\nON CONFLICT {conf_cols} DO NOTHING"
        insert_sql += ';'
        cursor.execute(insert_sql)
        conn.commit()
        return success_msg, 200
    except Exception:
        return traceback.format_exc(), 400

conn 光标参数是从sqlalchemy 引擎的代码生成的:

def create_pool(self, **db_config):
    db_user = self.user
    db_pass = self.password
    db_name = self.database

    # e.g. "/cloudsql"
    db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")

    # i.e "<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>"
    cloud_sql_connection_name = os.environ.get("CLOUD_SQL_CONNECTION_NAME",
                                               '<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>')

    self.pool = sqlalchemy.create_engine(

        # Equivalent URL:
        # postgresql+pg8000://<db_user>:<db_pass>@/<db_name>
        #                     ?unix_sock=<socket_path>/<cloud_sql_instance_name>/.s.PGSQL.5432
        sqlalchemy.engine.url.URL.create(drivername="postgresql+pg8000",
                                         username=db_user,  # e.g. "my-database-user"
                                         password=db_pass,  # e.g. "my-database-password"
                                         database=db_name,  # e.g. "my-database-name"
                                         query={"unix_sock":
                                                    f"{db_socket_dir}/{cloud_sql_connection_name}/.s.PGSQL.5432"}),
        **db_config
    )

def get_db_connection(self) -> Connection:
    if self.pool is None:
        self.create_pool()

    assert isinstance(self.pool, Engine)
    try:
        return self.pool.raw_connection()
    except psycopg2.OperationalError:
        self.create_pool()
        return self.pool.raw_connection()

@contextlib.contextmanager
def db_connect(self):
    db = self.get_db_connection()
    cur = db.cursor()
    try:
        yield db, cur
    finally:
        db.close()

i '我尝试在Google Cloud函数(Linux)中使用此代码,并且在运行 insert_with_open_connection 方法时,我会获得以下错误/追溯:

Traceback (most recent call last):
  File "/workspace/db/sql_helper.py", line 221, in insert_with_open_connection
    args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
  File "/workspace/db/sql_helper.py", line 221, in <genexpr>
    args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
AttributeError: 'Cursor' object has no attribute 'mogrify'

很明显, Cursor 在代码中,似乎没有属性 mogrify ,而是基于文档在此 mogrify 方法应该存在。

I have a class that helps me with SQL queries and inserts in Postgres. I'm using psycopg2==2.7.5 right now. One of the methods I'm using looks like this:

import pandas as pd    
import psycopg2.extensions as ps_ext
from typing import List

def insert_with_open_connection(self, df: pd.DataFrame, table_name: str, cursor: ps_ext.cursor,
                                conn: ps_ext.connection,
                                success_msg: str = 'Success',
                                conflict_cols: List[str] = None):
    try:
        # Format the INSERT SQL query
        cols = str(tuple(df.columns)).replace("'", '')
        nc = df.shape[1]
        ss = "(" + ''.join('%s,' for _ in range(nc))[:-1] + ")"
        try:
            args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
        except psycopg2.ProgrammingError:
            args_str = str(b','.join(cursor.mogrify(ss, x) for x in self.clean_numpy_int_for_mogrify(df.values)),
                           'utf-8')
        args_str = args_str.replace("\'NaN\'::float", 'NULL')
        insert_sql = f'INSERT INTO {table_name} {cols} VALUES {args_str}'
        if conflict_cols is not None:
            conf_cols = str(tuple(conflict_cols)).replace("'", '').replace(',)', ')')
            insert_sql += f"\nON CONFLICT {conf_cols} DO NOTHING"
        insert_sql += ';'
        cursor.execute(insert_sql)
        conn.commit()
        return success_msg, 200
    except Exception:
        return traceback.format_exc(), 400

The conn and cursor parameters are generated from a SqlAlchemy Engine with code like this:

def create_pool(self, **db_config):
    db_user = self.user
    db_pass = self.password
    db_name = self.database

    # e.g. "/cloudsql"
    db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")

    # i.e "<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>"
    cloud_sql_connection_name = os.environ.get("CLOUD_SQL_CONNECTION_NAME",
                                               '<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>')

    self.pool = sqlalchemy.create_engine(

        # Equivalent URL:
        # postgresql+pg8000://<db_user>:<db_pass>@/<db_name>
        #                     ?unix_sock=<socket_path>/<cloud_sql_instance_name>/.s.PGSQL.5432
        sqlalchemy.engine.url.URL.create(drivername="postgresql+pg8000",
                                         username=db_user,  # e.g. "my-database-user"
                                         password=db_pass,  # e.g. "my-database-password"
                                         database=db_name,  # e.g. "my-database-name"
                                         query={"unix_sock":
                                                    f"{db_socket_dir}/{cloud_sql_connection_name}/.s.PGSQL.5432"}),
        **db_config
    )

def get_db_connection(self) -> Connection:
    if self.pool is None:
        self.create_pool()

    assert isinstance(self.pool, Engine)
    try:
        return self.pool.raw_connection()
    except psycopg2.OperationalError:
        self.create_pool()
        return self.pool.raw_connection()

@contextlib.contextmanager
def db_connect(self):
    db = self.get_db_connection()
    cur = db.cursor()
    try:
        yield db, cur
    finally:
        db.close()

I'm trying to use this code inside a Google Cloud Function (Linux) and I get the following error/traceback when I run the insert_with_open_connection method there:

Traceback (most recent call last):
  File "/workspace/db/sql_helper.py", line 221, in insert_with_open_connection
    args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
  File "/workspace/db/sql_helper.py", line 221, in <genexpr>
    args_str = str(b','.join(cursor.mogrify(ss, x) for x in df.values), 'utf-8')
AttributeError: 'Cursor' object has no attribute 'mogrify'

It's obvious that the cursor in the code doesn't seem to have the attribute mogrify, but based on the docs here, the mogrify method should exist.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

寂寞陪衬 2025-01-28 01:36:06

我查看了代码,并注意到您正在使用导入psycopg2.extensions作为ps_ext ;显然,在
文档。

然后我遇到了这条线:

self.pool = sqlalchemy.create_engine(

        # Equivalent URL:
        # postgresql+pg8000://<db_user>:<db_pass>@/<db_name>
        #                     ?unix_sock=<socket_path>/<cloud_sql_instance_name>/.s.PGSQL.5432
        sqlalchemy.engine.url.URL.create(drivername="postgresql+pg8000",
                                         username=db_user,  # e.g. "my-database-user"
                                         password=db_pass,  # e.g. "my-database-password"
                                         database=db_name,  # e.g. "my-database-name"
                                         query={"unix_sock":
                                                    f"{db_socket_dir}/{cloud_sql_connection_name}/.s.PGSQL.5432"}),
        **db_config
    )

您不使用psycopg2驱动程序;但是PG8000和跟踪
生成的方式,由 db.cursor()返回的光标又由self.pool.raw_connection(),
我得出的结论是,光标不是PS_EXT光标,但是
PG8000光标,该光标没有 mogrify 方法为
如下所示:

这就是为什么您遇到此错误的可能性。我认为
解决方案是更改为使用Psycopg2驱动程序。

就是说,这个答案可能是错误的,我在吠叫错误的树。

I took a look at the code and noticed that you were using import psycopg2.extensions as ps_ext; and clearly that had mogrify in
the docs.

Then I came across this line:

self.pool = sqlalchemy.create_engine(

        # Equivalent URL:
        # postgresql+pg8000://<db_user>:<db_pass>@/<db_name>
        #                     ?unix_sock=<socket_path>/<cloud_sql_instance_name>/.s.PGSQL.5432
        sqlalchemy.engine.url.URL.create(drivername="postgresql+pg8000",
                                         username=db_user,  # e.g. "my-database-user"
                                         password=db_pass,  # e.g. "my-database-password"
                                         database=db_name,  # e.g. "my-database-name"
                                         query={"unix_sock":
                                                    f"{db_socket_dir}/{cloud_sql_connection_name}/.s.PGSQL.5432"}),
        **db_config
    )

You aren't using the psycopg2 driver; but the pg8000 one and tracing
the way things are generated, the cursor as returned by the db.cursor() which in turn was created by the self.pool.raw_connection(),
I came to the conclusion that the cursor wasn't a ps_ext cursor but
a pg8000 cursor, which doesn't have the mogrify method as
shown in: https://github.com/tlocke/pg8000/blob/main/pg8000/dbapi.py

This is the likelihood of why you're having this error. I think
the solution is to change to using psycopg2 driver instead.

That said, this answer could be wrong and I'm barking up the wrong tree.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文