使用带有 sqlalchemy 的 pandas df.to_sql 和带有 mssql 的 pyodbc 的参数类型无效(numpy.int64)

发布于 2025-01-11 08:05:45 字数 4742 浏览 0 评论 0原文

我有一个脚本,需要将数据帧的内容附加到我创建的 SQL 数据库表中。我需要对具有多个数据帧作为源的多个表多次执行此操作。

我在与 MSSQL 数据库的 pyodbc 连接上使用带有 sqlalchemy 引擎的 Pandas

为了确保我只附加数据库中具有相应列的数据帧中的数据,我有一个“将数据附加到 sql”功能

def append_data_to_sql(db_connection, new_rows: pd.DataFrame, table_name: str) -> bool:
    # Get column names for the destination table
    query = 'SELECT column_name, data_type ' \
            'FROM information_schema.columns ' \
            'WHERE table_name=?'
    result = db_connection.execute(query, table_name).fetchall()
    columns_in_sql = pd.DataFrame(data=result, columns=['COLUMN_NAME', 'DATA_TYPE'])
    new_table = pd.DataFrame(columns=list(columns_in_sql['COLUMN_NAME']))
    new_rows.columns = new_rows.columns.str.lower()
    new_table.columns = new_table.columns.str.lower()

    # Only keep the columns that are in destination and if there is no
    # column in the data to be appended then create an empty column
    for column in new_table.columns:
        if column in new_rows.columns:
            new_table[column] = new_rows[column]
        else:
            new_table[column] = pd.NA

    try:
        new_table.to_sql(table_name, db_connection, if_exists='append', index=False)
    except sqlalchemy.exc.DBAPIError as e:
        logging.exception(f'Error while appending to {table_name}: {e}', exc_info=True)
        return True

    return False

:我传递给我的函数的数据是:

 new_rows = pd.DataFrame.from_records({
                                       'system': 'the_system_name', 
                                       'data_update_time': Timestamp('2022-03-02 10:00:48.958701'), 
                                       'first_available_data_point': None, 
                                       'last_available_data_point': None, 
                                       'line_name': 'the_line_name', 
                                       'server': 'the_server_name', 
                                       'day_start_hours': 0.0, 
                                       'bu': 'the_bu_name', 
                                       'number_of_constraints': 3 
                                       })

columns_in_sql = pd.DataFrame.(data= [
                                      ('system', 'varchar'),
                                      ('data_update_time', 'datetime'), 
                                      ('first_available_data_point', 'datetime'), 
                                      ('last_available_data_point', 'datetime'), 
                                      ('line_name', 'varchar'), 
                                      ('server', 'varchar'), 
                                      ('day_start_hours', 'numeric'), 
                                      ('bu', 'varchar'), 
                                      ('number_of_constraints', 'int')
                                     ], columns=['COLUMN_NAME', 'DATA_TYPE']) 

我得到的错误是:

sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('Invalid parameter type.  param-index=8 param-type=numpy.int64', 'HY105')
[SQL: INSERT INTO my_table (system, data_update_time, first_available_data_point, last_available_data_point, line_name, server, day_start_hours, bu, number_of_constraints) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)]
[parameters: ('the_system_name', Timestamp('2022-03-02 10:00:48.958701'), None, None, 'the_line_name', 'the_server_name', 0.0, 'the_bu_name', 3)]
(Background on this error at: https://sqlalche.me/e/14/f405)

问题显然是末尾的“3”对于 SQL 数据库来说是错误的整数类型,并且我发现了 类似问题解决了这个确切的问题,但是通过 pyodbc 直接调用executemany()。

问题是我试图通过 Pandas 使用 SqlAlchemy,所以我实际上并没有自己创建插入语句。

我尝试通过添加以下内容来指定每列的 dtype:

from sqlalchemy import types

sql_dtypes = {'varchar': types.String(),
              'int': types.SmallInteger(),
              'datetime': types.DateTime(),
              'date': types.Date(),
              'nvarchar': types.String(),
              'numeric': types.Numeric(),
              'float': types.Float(),
              'real': types.Float(),
              'bool': types.Boolean(),
              }
    
for index, row in columns_in_sql.iterrows():
        new_dtypes[row['COLUMN_NAME']] = sql_dtypes[row['DATA_TYPE']]

并将 dtype arg 添加到 to_sql

new_table.to_sql(table_name, db_connection, if_exists='append', index=False, dtype=new_dtypes)

然后我尝试了 SqlAlchemy 文档页面上的所有不同的整数类型,Integer(),BigInteger()SmallInteger(),有同样的错误。

我希望在我重新编写函数来完成 pandas 和 sqlalchemy 应该(我认为)已经处理的所有事情之前,我可以在这里找到解决方案。

I have a script where I need to append the contents of a dataframe to a SQL database table I created. I need to do this many times to several tables with several dataframes as sources.

I am using Pandas with a sqlalchemy engine on a pyodbc connection to an MSSQL database

To ensure that I am only appending the data from the dataframe which has a corresponding column in the database, I have an "append data to sql" function:

def append_data_to_sql(db_connection, new_rows: pd.DataFrame, table_name: str) -> bool:
    # Get column names for the destination table
    query = 'SELECT column_name, data_type ' \
            'FROM information_schema.columns ' \
            'WHERE table_name=?'
    result = db_connection.execute(query, table_name).fetchall()
    columns_in_sql = pd.DataFrame(data=result, columns=['COLUMN_NAME', 'DATA_TYPE'])
    new_table = pd.DataFrame(columns=list(columns_in_sql['COLUMN_NAME']))
    new_rows.columns = new_rows.columns.str.lower()
    new_table.columns = new_table.columns.str.lower()

    # Only keep the columns that are in destination and if there is no
    # column in the data to be appended then create an empty column
    for column in new_table.columns:
        if column in new_rows.columns:
            new_table[column] = new_rows[column]
        else:
            new_table[column] = pd.NA

    try:
        new_table.to_sql(table_name, db_connection, if_exists='append', index=False)
    except sqlalchemy.exc.DBAPIError as e:
        logging.exception(f'Error while appending to {table_name}: {e}', exc_info=True)
        return True

    return False

The context data I'm passing to my function is:

 new_rows = pd.DataFrame.from_records({
                                       'system': 'the_system_name', 
                                       'data_update_time': Timestamp('2022-03-02 10:00:48.958701'), 
                                       'first_available_data_point': None, 
                                       'last_available_data_point': None, 
                                       'line_name': 'the_line_name', 
                                       'server': 'the_server_name', 
                                       'day_start_hours': 0.0, 
                                       'bu': 'the_bu_name', 
                                       'number_of_constraints': 3 
                                       })

columns_in_sql = pd.DataFrame.(data= [
                                      ('system', 'varchar'),
                                      ('data_update_time', 'datetime'), 
                                      ('first_available_data_point', 'datetime'), 
                                      ('last_available_data_point', 'datetime'), 
                                      ('line_name', 'varchar'), 
                                      ('server', 'varchar'), 
                                      ('day_start_hours', 'numeric'), 
                                      ('bu', 'varchar'), 
                                      ('number_of_constraints', 'int')
                                     ], columns=['COLUMN_NAME', 'DATA_TYPE']) 

The error that I am getting is:

sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('Invalid parameter type.  param-index=8 param-type=numpy.int64', 'HY105')
[SQL: INSERT INTO my_table (system, data_update_time, first_available_data_point, last_available_data_point, line_name, server, day_start_hours, bu, number_of_constraints) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)]
[parameters: ('the_system_name', Timestamp('2022-03-02 10:00:48.958701'), None, None, 'the_line_name', 'the_server_name', 0.0, 'the_bu_name', 3)]
(Background on this error at: https://sqlalche.me/e/14/f405)

The issue is clearly that the '3' on the end is the wrong kind of integer for the SQL database, and I found a similar issue which addresses this exact issue, but with a direct executemany() call through pyodbc.

The problem is that I'm trying to use SqlAlchemy through Pandas, so I'm not actually creating the insert statement myself.

I've tried specifying the dtype of each column by adding:

from sqlalchemy import types

sql_dtypes = {'varchar': types.String(),
              'int': types.SmallInteger(),
              'datetime': types.DateTime(),
              'date': types.Date(),
              'nvarchar': types.String(),
              'numeric': types.Numeric(),
              'float': types.Float(),
              'real': types.Float(),
              'bool': types.Boolean(),
              }
    
for index, row in columns_in_sql.iterrows():
        new_dtypes[row['COLUMN_NAME']] = sql_dtypes[row['DATA_TYPE']]

and adding the dtype arg to to_sql:

new_table.to_sql(table_name, db_connection, if_exists='append', index=False, dtype=new_dtypes)

I then tried all the different Integer Types on the SqlAlchemy docs page, Integer(), BigInteger(), SmallInteger(), with the same error.

I'm hoping I can find a solution for this here before I re-write the function to do all the things pandas and sqlalchemy should (I think) be taking care of already.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文