将SQL Server表数据提取到Parquet文件
我正在尝试使用sqlalchemy,pandas和fastparquet模块将SQL Server表数据之一提取到镶木木文件格式,但最终出现了例外。感谢您的一些帮助,我正在用一列非空整数类型的简单桌子尝试一下。
代码:
import sqlalchemy as sa
import pandas as pd
import urllib as ul
import fastparquet as fp
def main():
sqlInstance = 'sqlInstance'
database = 'database'
tableName = 'Numbers'
props = ul.parse.quote_plus("DRIVER={SQL Server Native Client 11.0};"
"SERVER=" + sqlInstance + ";"
"DATABASE=" + database + ";"
"Trusted_Connection=yes;")
con = sa.create_engine("mssql+pyodbc:///?odbc_connect={}".format(props))
fetch_batch_size = 1000
metadata = sa.schema.MetaData(bind=con)
table = sa.Table(tableName, metadata, autoload=True)
# Generate pandas/python compatible datatype mapping
map = {}
data_type_map_lookup = {
'int64': ['smallint', 'tinyint', 'integer'],
'float': ['bigint', 'float', 'real'],
'str': ['char', 'nchar', 'nvarchar', 'nvarchar(max)', 'uniqueidentifier', 'varchar(n)', 'varchar(max)'],
'datetime64[ns]': ['date', 'datetime', 'smalldatetime'],
'bytes': ['binary', 'varbinary', 'varbinary(max)'],
'bool': ['bit']
}
for col in table.columns:
for key, value in data_type_map_lookup.items():
types = data_type_map_lookup[key]
if list(filter(str(col.type).lower().startswith, types)):
if col.nullable and key == 'int64':
map[col.name] = 'float'
else:
map[col.name] = key
#Fetch data
output = table.select().execution_options(stream_results=True).execute()
append = False
while True:
batch = output.fetchmany(fetch_batch_size)
if not len(batch) > 0:
break
else:
df = (pd.DataFrame(data=batch, columns=map)).astype(dtype=map)
print(df.to_string()) # Prints good
fp.write("C:\\temp\\test.parquet", df, write_index=False, compression=False, append=append)
append = True
if __name__ == "__main__":
main()
异常:
TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'
Traceback (most recent call last):
File "C:\...lib\site-packages\fastparquet\writer.py", line 1488, in write_thrift
return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name
TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'
Traceback (most recent call last):
File "C:\...lib\site-packages\fastparquet\writer.py", line 1488, in write_thrift
return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name
I'm trying to extract one of the SQL Server table data to parquet file format using sqlalchemy, pandas and fastparquet modules, but end up with an exception. Appreciate some help on this, I'm trying this one on a simple table with one column of non null integer type.
Code:
import sqlalchemy as sa
import pandas as pd
import urllib as ul
import fastparquet as fp
def main():
sqlInstance = 'sqlInstance'
database = 'database'
tableName = 'Numbers'
props = ul.parse.quote_plus("DRIVER={SQL Server Native Client 11.0};"
"SERVER=" + sqlInstance + ";"
"DATABASE=" + database + ";"
"Trusted_Connection=yes;")
con = sa.create_engine("mssql+pyodbc:///?odbc_connect={}".format(props))
fetch_batch_size = 1000
metadata = sa.schema.MetaData(bind=con)
table = sa.Table(tableName, metadata, autoload=True)
# Generate pandas/python compatible datatype mapping
map = {}
data_type_map_lookup = {
'int64': ['smallint', 'tinyint', 'integer'],
'float': ['bigint', 'float', 'real'],
'str': ['char', 'nchar', 'nvarchar', 'nvarchar(max)', 'uniqueidentifier', 'varchar(n)', 'varchar(max)'],
'datetime64[ns]': ['date', 'datetime', 'smalldatetime'],
'bytes': ['binary', 'varbinary', 'varbinary(max)'],
'bool': ['bit']
}
for col in table.columns:
for key, value in data_type_map_lookup.items():
types = data_type_map_lookup[key]
if list(filter(str(col.type).lower().startswith, types)):
if col.nullable and key == 'int64':
map[col.name] = 'float'
else:
map[col.name] = key
#Fetch data
output = table.select().execution_options(stream_results=True).execute()
append = False
while True:
batch = output.fetchmany(fetch_batch_size)
if not len(batch) > 0:
break
else:
df = (pd.DataFrame(data=batch, columns=map)).astype(dtype=map)
print(df.to_string()) # Prints good
fp.write("C:\\temp\\test.parquet", df, write_index=False, compression=False, append=append)
append = True
if __name__ == "__main__":
main()
Exception:
TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'
Traceback (most recent call last):
File "C:\...lib\site-packages\fastparquet\writer.py", line 1488, in write_thrift
return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name
TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'
Traceback (most recent call last):
File "C:\...lib\site-packages\fastparquet\writer.py", line 1488, in write_thrift
return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我经历了同样的问题,并找到了解决方法。
这是一个最小的可重复代码。 (使用
python == 3.10.7
,pandas == 1.5.3
,fastparquet == 2023.4.0
,sqlalchemy ==== 1.4.47
)这似乎是由于数据框列名称通过SQLalchemy读取时不是纯str。
这是对我有用的解决方法。
I experienced the same issue and found a workaround.
Here is a minimal reproducible code. (using
python==3.10.7
,pandas==1.5.3
,fastparquet==2023.4.0
,SQLAlchemy==1.4.47
)This looked to be due to that the DataFrame column names are not pure str when it is read through SQLAlchemy.
Here is the workaround that worked for me.
您也可以使用吊索( https://slingdata.io )。
见下文。
you can also do this with sling (https://slingdata.io).
See below.