将SQL Server表数据提取到Parquet文件

发布于 2025-01-25 10:59:41 字数 2807 浏览 1 评论 0原文

我正在尝试使用sqlalchemy,pandas和fastparquet模块将SQL Server表数据之一提取到镶木木文件格式,但最终出现了例外。感谢您的一些帮助,我正在用一列非空整数类型的简单桌子尝试一下。

代码:

import sqlalchemy as sa
import pandas as pd
import urllib as ul
import fastparquet as fp

def main():
    sqlInstance = 'sqlInstance'
    database = 'database'
    tableName = 'Numbers'
    props = ul.parse.quote_plus("DRIVER={SQL Server Native Client 11.0};"
                                    "SERVER=" + sqlInstance + ";"
                                    "DATABASE=" + database + ";"
                                    "Trusted_Connection=yes;")
    con = sa.create_engine("mssql+pyodbc:///?odbc_connect={}".format(props))
    fetch_batch_size = 1000
    metadata = sa.schema.MetaData(bind=con)
    table = sa.Table(tableName, metadata, autoload=True)

    # Generate pandas/python compatible datatype mapping
    map = {}
    data_type_map_lookup = {
        'int64': ['smallint', 'tinyint', 'integer'],
        'float': ['bigint', 'float', 'real'],
        'str': ['char', 'nchar', 'nvarchar', 'nvarchar(max)', 'uniqueidentifier', 'varchar(n)', 'varchar(max)'],
        'datetime64[ns]': ['date', 'datetime', 'smalldatetime'],
        'bytes': ['binary', 'varbinary', 'varbinary(max)'],
        'bool': ['bit']
    }
    for col in table.columns:
        for key, value in data_type_map_lookup.items():
            types = data_type_map_lookup[key]
            if list(filter(str(col.type).lower().startswith, types)):
                if col.nullable and key == 'int64':
                    map[col.name] = 'float'
                else:
                    map[col.name] = key

    #Fetch data
    output = table.select().execution_options(stream_results=True).execute()
    append = False
    while True:
        batch = output.fetchmany(fetch_batch_size)
        if not len(batch) > 0:
            break
        else:
            df = (pd.DataFrame(data=batch, columns=map)).astype(dtype=map)
            print(df.to_string())  # Prints good
            fp.write("C:\\temp\\test.parquet", df, write_index=False, compression=False, append=append)
        append = True


if __name__ == "__main__":
    main()

异常:

TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'
Traceback (most recent call last):
  File "C:\...lib\site-packages\fastparquet\writer.py", line 1488, in write_thrift
    return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name
TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'
Traceback (most recent call last):
  File "C:\...lib\site-packages\fastparquet\writer.py", line 1488, in write_thrift
    return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name

I'm trying to extract one of the SQL Server table data to parquet file format using sqlalchemy, pandas and fastparquet modules, but end up with an exception. Appreciate some help on this, I'm trying this one on a simple table with one column of non null integer type.

Code:

import sqlalchemy as sa
import pandas as pd
import urllib as ul
import fastparquet as fp

def main():
    sqlInstance = 'sqlInstance'
    database = 'database'
    tableName = 'Numbers'
    props = ul.parse.quote_plus("DRIVER={SQL Server Native Client 11.0};"
                                    "SERVER=" + sqlInstance + ";"
                                    "DATABASE=" + database + ";"
                                    "Trusted_Connection=yes;")
    con = sa.create_engine("mssql+pyodbc:///?odbc_connect={}".format(props))
    fetch_batch_size = 1000
    metadata = sa.schema.MetaData(bind=con)
    table = sa.Table(tableName, metadata, autoload=True)

    # Generate pandas/python compatible datatype mapping
    map = {}
    data_type_map_lookup = {
        'int64': ['smallint', 'tinyint', 'integer'],
        'float': ['bigint', 'float', 'real'],
        'str': ['char', 'nchar', 'nvarchar', 'nvarchar(max)', 'uniqueidentifier', 'varchar(n)', 'varchar(max)'],
        'datetime64[ns]': ['date', 'datetime', 'smalldatetime'],
        'bytes': ['binary', 'varbinary', 'varbinary(max)'],
        'bool': ['bit']
    }
    for col in table.columns:
        for key, value in data_type_map_lookup.items():
            types = data_type_map_lookup[key]
            if list(filter(str(col.type).lower().startswith, types)):
                if col.nullable and key == 'int64':
                    map[col.name] = 'float'
                else:
                    map[col.name] = key

    #Fetch data
    output = table.select().execution_options(stream_results=True).execute()
    append = False
    while True:
        batch = output.fetchmany(fetch_batch_size)
        if not len(batch) > 0:
            break
        else:
            df = (pd.DataFrame(data=batch, columns=map)).astype(dtype=map)
            print(df.to_string())  # Prints good
            fp.write("C:\\temp\\test.parquet", df, write_index=False, compression=False, append=append)
        append = True


if __name__ == "__main__":
    main()

Exception:

TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'
Traceback (most recent call last):
  File "C:\...lib\site-packages\fastparquet\writer.py", line 1488, in write_thrift
    return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name
TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'
Traceback (most recent call last):
  File "C:\...lib\site-packages\fastparquet\writer.py", line 1488, in write_thrift
    return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

故事未完 2025-02-01 10:59:41

我经历了同样的问题,并找到了解决方法。

这是一个最小的可重复代码。 (使用python == 3.10.7pandas == 1.5.3fastparquet == 2023.4.0sqlalchemy ==== 1.4.47

import tempfile
import numpy as np
import pandas as pd
import fastparquet
from sqlalchemy.sql.expression import quoted_name

with tempfile.TemporaryDirectory() as tempdir:

    # Test data
    df = pd.DataFrame({"col_i": np.arange(10), "col_f": np.random.random(10)})

    outpath = f"{tempdir}/test_data.parquet"

    # Base case: fastparquet IO works no problem with the test data
    fastparquet.write(outpath, df)
    df_out = fastparquet.ParquetFile(outpath).to_pandas()
    assert df.equals(df_out)

    # Write the data to SQL, and then load back
    con, table_name = f"sqlite:///{tempdir}/test_data.sqlite3", "table123"
    df.to_sql(table_name, con, index=False)
    df_sql = pd.read_sql_table(table_name, con)
    # ...and it still looks to be fine
    assert df.equals(df_sql)

    # However, column names are not pure `str`.
    # They are actually instances of :class:`quoted_name`, which is a
    # subclass of `str`.
    for column_name in df_sql.columns:
        assert isinstance(column_name, quoted_name)

    # This will cause failure while writing to the file.
    # The exception will be caught and output to stderr, but will be ignored.
    fastparquet.write(outpath, df_sql)

    # This line will raise an error since the Parquet file is broken.
    df_out = fastparquet.ParquetFile(outpath).to_pandas()

这似乎是由于数据框列名称通过SQLalchemy读取时不是纯str。

这是对我有用的解决方法。

# Make sure that the column names are plain str
df.columns = [str(name) for name in df.columns]

# Now, fastparquet can write the DataFrame correctly
fastparquet.write(outpath, df)

I experienced the same issue and found a workaround.

Here is a minimal reproducible code. (using python==3.10.7, pandas==1.5.3, fastparquet==2023.4.0, SQLAlchemy==1.4.47)

import tempfile
import numpy as np
import pandas as pd
import fastparquet
from sqlalchemy.sql.expression import quoted_name

with tempfile.TemporaryDirectory() as tempdir:

    # Test data
    df = pd.DataFrame({"col_i": np.arange(10), "col_f": np.random.random(10)})

    outpath = f"{tempdir}/test_data.parquet"

    # Base case: fastparquet IO works no problem with the test data
    fastparquet.write(outpath, df)
    df_out = fastparquet.ParquetFile(outpath).to_pandas()
    assert df.equals(df_out)

    # Write the data to SQL, and then load back
    con, table_name = f"sqlite:///{tempdir}/test_data.sqlite3", "table123"
    df.to_sql(table_name, con, index=False)
    df_sql = pd.read_sql_table(table_name, con)
    # ...and it still looks to be fine
    assert df.equals(df_sql)

    # However, column names are not pure `str`.
    # They are actually instances of :class:`quoted_name`, which is a
    # subclass of `str`.
    for column_name in df_sql.columns:
        assert isinstance(column_name, quoted_name)

    # This will cause failure while writing to the file.
    # The exception will be caught and output to stderr, but will be ignored.
    fastparquet.write(outpath, df_sql)

    # This line will raise an error since the Parquet file is broken.
    df_out = fastparquet.ParquetFile(outpath).to_pandas()

This looked to be due to that the DataFrame column names are not pure str when it is read through SQLAlchemy.

Here is the workaround that worked for me.

# Make sure that the column names are plain str
df.columns = [str(name) for name in df.columns]

# Now, fastparquet can write the DataFrame correctly
fastparquet.write(outpath, df)
十年不长 2025-02-01 10:59:41

您也可以使用吊索( https://slingdata.io )。
见下文。

# set connection via env var
export mssql='sqlserver://...'

# test connection
sling conns test mssql

# run export for one table
sling run --src-conn mssql --src-stream 'my_schema.my_table' --tgt-object 'file://my_folder/my_table.parquet'

# run export for many tables
sling run --src-conn mssql --src-stream 'my_schema.*' --tgt-object 'file://{stream_schema}/{stream_table}.parquet'

you can also do this with sling (https://slingdata.io).
See below.

# set connection via env var
export mssql='sqlserver://...'

# test connection
sling conns test mssql

# run export for one table
sling run --src-conn mssql --src-stream 'my_schema.my_table' --tgt-object 'file://my_folder/my_table.parquet'

# run export for many tables
sling run --src-conn mssql --src-stream 'my_schema.*' --tgt-object 'file://{stream_schema}/{stream_table}.parquet'
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文