比较两个 Pandas DataFrame 并仅将不匹配的值打印到 CSV 文件
Python 大师们好!感谢您花时间查看我的问题。
用例: 我想找到一个最佳解决方案来比较来自 SQL Server 和 Snowflake Azure 的两个 DataFrame,以进行数据验证,然后仅将 SQL Server DF 中与数据不匹配或丢失数据的结果导出到 CSV 文件,在雪花 DF 结果中。这样我就可以在 SQL Server 中获取结果和研究,以了解为什么这些记录没有转移到 Snowflake。
此外,我需要删除源表和目标表之间不匹配或缺失的任何列(手动执行此操作,如下面的代码所示),将列和数据转换为大写,并用零填充 NaN。
最后,我在对所有列进行排序()后添加了重新索引(),以确保列按字母顺序排列以进行比较。
问题: 查看下面的代码,并考虑到我之前尝试过的错误代码,您是否有更优雅的解决方案,或者您是否发现我的代码中存在缺陷,我可以纠正并希望使其能够工作?
比较两个数据框并仅获取索引和列名不匹配的值 pandas dataframe python
我正在尝试在代码中上面链接的解决方案,但我不断得到此错误:
Traceback (most recent call last):
File "D:\PythonScripts\Projects\DataCompare\DFCmpr_SQL_ManualSQL.py", line 166, in <module>
df_diff = sql_df[sf_df != sql_df].dropna(how='all', axis=1).dropna(how='all', axis=0).astype('Int64')
File "C:\Program Files\Python39\lib\site-packages\pandas\core\ops\common.py", line 69, in new_method
return method(self, other)
File "C:\Program Files\Python39\lib\site-packages\pandas\core\arraylike.py", line 36, in __ne__
return self._cmp_method(other, operator.ne)
File "C:\Program Files\Python39\lib\site-packages\pandas\core\frame.py", line 6851, in _cmp_method
self, other = ops.align_method_FRAME(self, other, axis, flex=False, level=None)
File "C:\Program Files\Python39\lib\site-packages\pandas\core\ops\__init__.py", line 288, in align_method_FRAME
raise ValueError(
ValueError: Can only compare identically-labeled DataFrame objects
我已手动检查输出 print(df1.columns) 和 print(df2.columns),但没有看到任何差异。对于index()来说,它们对我来说似乎是相同的。我可能做错了什么吗?
SQL Server 是数据表,Snowflake 是数据视图。每个数据库中的列名称完全相同。
我的代码如下(出于安全原因,我重命名了一些列):
# ############## VERSION 2.0 #############
# ###### NOTES ########
# -------------- Import packages needed ----------------------------
import sys, os, pyodbc, datetime, collections
import pandas as pd
import snowflake.connector as sf
import sqlalchemy as sa
import SNCC_Conn as sfconn
pd.set_option("display.max_rows", 999)
# set params for Snowflake Connection
sncc_db = 'DATABASE'
sncc_sch = 'SCHEMA'
sncc_tbl = 'TABLE_1'
sncc_qry = 'SELECT * FROM '+sncc_sch+'.'+sncc_tbl+''
sf_qry = r'' + sncc_qry
# cant use a tupel as that is not able to be updated.
# set params for SQL Connection TST . This is setup for trusted site meaning it will use SSO.
sql_srvr = 'SQLSERVER'
sql_db = 'DATABASE'
sql_sch = 'SCHEMA'
sql_tbl = 'TABLE_1'
ms_sql_qry = 'SELECT * FROM '+sql_sch+'.' +sql_tbl+''
fileName = 'SQL_SF_CombinedPolicy'
# --------------------------- Snowflake Connection ---------------------------
try:
sf_conn = sfconn.snowflake_connect(schema = sncc_sch, database = sncc_db)
except Exception as e:
print('Connection Failed. Please try again.')
print('Error: ' + str(e) )
quit()
print('Snowflake Connection established!')
print(sf_qry)
try:
# excute the query
sf_conn.execute(sf_qry)
# Fetch all snowflake results into a Pandas Dataframe
sf_df = sf_conn.fetch_pandas_all()
# Make all Dataframe Columns Uppercase
sf_df.columns = map(str.upper, sf_df.columns)
#Replace NaN ( not a number ) data values with a zero (0).
sf_df = sf_df.fillna(0)
# Remove columns that are not in source table on a per need basis OR Comment it out with a #.
sf_df = sf_df.loc[:, ~sf_df.columns.isin(['ROWx','ROWy','ROWz'])]
# Sort data by columns available, or can change this to sort only certain columns.
sf_df = sf_df.reindex(sorted(sf_df.columns), axis=1)
# Print out results on screen during development phase.
print(sf_df)
print(sf_df.columns)
print('Snowflake Dataframe Load Successful.')
except Exception as e:
print('Snowflake Dataframe load Unsuccessful. Please try again.')
print('Error: ' + str(e) )
# # --------------------------- SQL Server Connection ---------------------------
try:
# single '\' provides a concat to the DRIVE, SERVER, DATABASE, trusted connection lines, as if a single line of code.
sql_conn = pyodbc.connect('DRIVER={SQL Server}; \
SERVER=' + sql_srvr + '; \
DATABASE=' + sql_db +';\
Trusted_Connection=yes;' # Using Windows User Account for authentication.
)
# cursor = sql_conn.cursor()
print('SQL Connection established!')
except Exception as e:
print('Connection Failed. Please try again.')
print('Error: ' + str(e) )
try:
#SQLquery = input("What is your query for SQL Server?: ") -- Add "IF" statements to check manual input?
# Query results and place them in variable
# cursor.execute(sql_qry)
sql_qry = pd.read_sql_query(ms_sql_qry,sql_conn)
# Put results into a Data Frame from Pandas
sql_df = pd.DataFrame(sql_qry)
# Make all Dataframe Columns Uppercase
sql_df.columns = map(str.upper, sql_df.columns)
#Replace NaN ( not a number ) data values with a zero (0).
sql_df = sql_df.fillna(0)
# Remove columns that are not in target table on a per need basis OR comment it out with a #.
sql_df = sql_df.loc[:, ~sql_df.columns.isin(['ROW1','ROW2','ROW3'])]
# Sort data by columns
sql_df = sql_df.reindex(sorted(sql_df.columns), axis=1)
# Print out results during development phase.
print(sql_df)
print(sql_df.columns)
print('SQL Server Dataframe Load Successful')
print('Comparing SQL to SNCC Dataframes')
#/********************* COMPARISON SCRIPT **************/
#sql_df.compare(sncc_df)
# Compare the two DataFrames and produce results from Source (sql_df) that do not match Target (sf_df).
# ---------- ERROR: ValueError: Can only compare identically-labeled DataFrame objects
df_diff = sql_df[sf_df != sql_df].dropna(how='all', axis=1).dropna(how='all', axis=0).astype('Int64')
# print out results of differences during development phase.
print(df_diff)
# Export out to CSV using a variable for the name of the file, future state.
df_diff.to_csv(r'D:\PythonResults\DataDiff_' + fileName + '.csv', index = False)
print('Datafram output from comparison outputed to PythonResults folder in Documents as DataDiff_' + fileName + 'csv.')
except pyodbc.Error as e:
# Message stating export unsuccessful.
print("MSSQL Dataframe load unsuccessful.")
finally:
sf_conn.close()
print("Connection to Snowflake closed")
sql_conn.commit()
sql_conn.close()
print("Connection to MSSQL Server closed")
编辑 1:
我想补充一点,我从 SQL Server 和 Snowflake 引入的这些数据集具有混合的数据类型。 Integer、VarChar、Date、DateTime 等。我不确定这是否有区别。
Good Day Python Gurus! Thank you for taking the time to review my question.
Use Case:
I want to find a best solution to compare two DataFrames of data sourced from SQL Server and Snowflake Azure, for Data Validation, and then export to a CSV file ONLY the results in SQL Server DF that do not match the data, or missing data, in Snowflake DF results. This way I can take the results and research in SQL Server to see why those records did not make it over to Snowflake.
Additionally, I need to remove any columns that do not match or are missing between Source and Target tables (did this manually as you can see in my code below), convert columns and data to uppercase, and fill NaN with zero's.
Lastly, I added in a reindex() of all the columns after they were sorted() to make sure the columns are in alphabetical order for the comparison.
Question:
Reviewing my code below, and taking into account the code I tried earlier with errors, do you have a more elegant solution or do you see a flaw in my code I can correct and hopefully make this work?
I am attempting the solution linked above in my code, but I keep getting this error:
Traceback (most recent call last):
File "D:\PythonScripts\Projects\DataCompare\DFCmpr_SQL_ManualSQL.py", line 166, in <module>
df_diff = sql_df[sf_df != sql_df].dropna(how='all', axis=1).dropna(how='all', axis=0).astype('Int64')
File "C:\Program Files\Python39\lib\site-packages\pandas\core\ops\common.py", line 69, in new_method
return method(self, other)
File "C:\Program Files\Python39\lib\site-packages\pandas\core\arraylike.py", line 36, in __ne__
return self._cmp_method(other, operator.ne)
File "C:\Program Files\Python39\lib\site-packages\pandas\core\frame.py", line 6851, in _cmp_method
self, other = ops.align_method_FRAME(self, other, axis, flex=False, level=None)
File "C:\Program Files\Python39\lib\site-packages\pandas\core\ops\__init__.py", line 288, in align_method_FRAME
raise ValueError(
ValueError: Can only compare identically-labeled DataFrame objects
I have manually checked the output print(df1.columns) and print(df2.columns) and I do not see any difference. They seem identical to me for the index(). Am I doing something wrong maybe?
SQL Server is a Table of data, Snowflake is a View of Data. Columns are named exactly the same in each DB.
My Code is below (I renamed some columns for security reasons):
# ############## VERSION 2.0 #############
# ###### NOTES ########
# -------------- Import packages needed ----------------------------
import sys, os, pyodbc, datetime, collections
import pandas as pd
import snowflake.connector as sf
import sqlalchemy as sa
import SNCC_Conn as sfconn
pd.set_option("display.max_rows", 999)
# set params for Snowflake Connection
sncc_db = 'DATABASE'
sncc_sch = 'SCHEMA'
sncc_tbl = 'TABLE_1'
sncc_qry = 'SELECT * FROM '+sncc_sch+'.'+sncc_tbl+''
sf_qry = r'' + sncc_qry
# cant use a tupel as that is not able to be updated.
# set params for SQL Connection TST . This is setup for trusted site meaning it will use SSO.
sql_srvr = 'SQLSERVER'
sql_db = 'DATABASE'
sql_sch = 'SCHEMA'
sql_tbl = 'TABLE_1'
ms_sql_qry = 'SELECT * FROM '+sql_sch+'.' +sql_tbl+''
fileName = 'SQL_SF_CombinedPolicy'
# --------------------------- Snowflake Connection ---------------------------
try:
sf_conn = sfconn.snowflake_connect(schema = sncc_sch, database = sncc_db)
except Exception as e:
print('Connection Failed. Please try again.')
print('Error: ' + str(e) )
quit()
print('Snowflake Connection established!')
print(sf_qry)
try:
# excute the query
sf_conn.execute(sf_qry)
# Fetch all snowflake results into a Pandas Dataframe
sf_df = sf_conn.fetch_pandas_all()
# Make all Dataframe Columns Uppercase
sf_df.columns = map(str.upper, sf_df.columns)
#Replace NaN ( not a number ) data values with a zero (0).
sf_df = sf_df.fillna(0)
# Remove columns that are not in source table on a per need basis OR Comment it out with a #.
sf_df = sf_df.loc[:, ~sf_df.columns.isin(['ROWx','ROWy','ROWz'])]
# Sort data by columns available, or can change this to sort only certain columns.
sf_df = sf_df.reindex(sorted(sf_df.columns), axis=1)
# Print out results on screen during development phase.
print(sf_df)
print(sf_df.columns)
print('Snowflake Dataframe Load Successful.')
except Exception as e:
print('Snowflake Dataframe load Unsuccessful. Please try again.')
print('Error: ' + str(e) )
# # --------------------------- SQL Server Connection ---------------------------
try:
# single '\' provides a concat to the DRIVE, SERVER, DATABASE, trusted connection lines, as if a single line of code.
sql_conn = pyodbc.connect('DRIVER={SQL Server}; \
SERVER=' + sql_srvr + '; \
DATABASE=' + sql_db +';\
Trusted_Connection=yes;' # Using Windows User Account for authentication.
)
# cursor = sql_conn.cursor()
print('SQL Connection established!')
except Exception as e:
print('Connection Failed. Please try again.')
print('Error: ' + str(e) )
try:
#SQLquery = input("What is your query for SQL Server?: ") -- Add "IF" statements to check manual input?
# Query results and place them in variable
# cursor.execute(sql_qry)
sql_qry = pd.read_sql_query(ms_sql_qry,sql_conn)
# Put results into a Data Frame from Pandas
sql_df = pd.DataFrame(sql_qry)
# Make all Dataframe Columns Uppercase
sql_df.columns = map(str.upper, sql_df.columns)
#Replace NaN ( not a number ) data values with a zero (0).
sql_df = sql_df.fillna(0)
# Remove columns that are not in target table on a per need basis OR comment it out with a #.
sql_df = sql_df.loc[:, ~sql_df.columns.isin(['ROW1','ROW2','ROW3'])]
# Sort data by columns
sql_df = sql_df.reindex(sorted(sql_df.columns), axis=1)
# Print out results during development phase.
print(sql_df)
print(sql_df.columns)
print('SQL Server Dataframe Load Successful')
print('Comparing SQL to SNCC Dataframes')
#/********************* COMPARISON SCRIPT **************/
#sql_df.compare(sncc_df)
# Compare the two DataFrames and produce results from Source (sql_df) that do not match Target (sf_df).
# ---------- ERROR: ValueError: Can only compare identically-labeled DataFrame objects
df_diff = sql_df[sf_df != sql_df].dropna(how='all', axis=1).dropna(how='all', axis=0).astype('Int64')
# print out results of differences during development phase.
print(df_diff)
# Export out to CSV using a variable for the name of the file, future state.
df_diff.to_csv(r'D:\PythonResults\DataDiff_' + fileName + '.csv', index = False)
print('Datafram output from comparison outputed to PythonResults folder in Documents as DataDiff_' + fileName + 'csv.')
except pyodbc.Error as e:
# Message stating export unsuccessful.
print("MSSQL Dataframe load unsuccessful.")
finally:
sf_conn.close()
print("Connection to Snowflake closed")
sql_conn.commit()
sql_conn.close()
print("Connection to MSSQL Server closed")
EDIT 1:
I wanted to add that these data sets I am bringing in from SQL Server and Snowflake have a mixture of datatypes. Integer, VarChar, Date, DateTime, etc. I am not sure if that makes a difference.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
像这样的东西。
结果:
我确信还有其他几种方法可以完成同样的事情。请探索其他替代方案。
Something like this.
Result:
I'm sure there are several other ways to do the same thing. Please explore other alternatives.