8.4 数据清洗脚本化
随着你的 Python 知识的逐步深化与丰富,你编写的 Python 代码也会逐渐变得复杂。现在你可以编写函数、解析文件、导入并使用多个 Python 库,甚至还可以存储数据。是时候开始将代码脚本化了。脚本化(scripting)的意思是,确定代码的结构,用于后续使用、学习和分享。
以 UNICEF 数据为例。我们知道,UNICEF 每隔几年会发布这些数据集,其中许多数据是不变的。调查不太可能发生较大变化——它是建立在多年经验的基础之上。考虑到这些事实,我们可以信任这些数据集有相当高的一致性。如果我们需要再次用到 UNICEF 数据,可能至少可以复用第一次写的脚本中的一部分代码。
Python 之禅不仅适用于编写代码,还适用于组织代码,函数、变量和类的命名,等等。最好在选择命名上花点时间,判断哪些名字可以让你和他人都一目了然。注释和文档可以帮助理解,但代码本身也应该具有较强的可读性。
经常有人称赞 Python 是最容易读懂的语言之一,即使是看不懂代码的人也能读懂!保持代码语法简洁可读,这样解释代码功能的文档也不需要太长。
Python 之禅
Python 之禅(https://www.python.org/dev/peps/pep-0020/)总是非常值得参考的(还可以输入 import this 来轻松查看)。它的要点是,对于Python(和许多语言)来说,尽可能保持明确、简洁和实用总是最好的。1
1中文版 Python 之禅可参见:https://wiki.python.org/moin/PythonZenChineseTranslate。——译者注
通读 PEP-8 Python 风格指南(https://www.python.org/dev/peps/pep-0008/),并遵守里面的规则。有许多 PEP-8 的检查工具(linter),可以通读你的代码,并指出其中不符合 PEP-8 的地方。
除了风格标准和用法,你还可以用检查工具评估代码的复杂度。有些是根据 McCabe 关于循环复杂度的理论和计算方法(https://en.wikipedia.org/wiki/Cyclomatic_complexity)来对代码进行分析。虽然不是每次都能将代码分割成简单的代码块,但你应该尽量将复杂任务拆分成更小、更简单的任务,降低代码复杂度,使代码更明确。
在使代码更加清晰明确的同时,另一个很有用的做法是,让可复用的代码块更加通用。但注意不要过于一般化(def foo 这样的定义毫无用处),但如果你创建通用的辅助函数,你将会经常用到它们(例如用一个列表创建 CSV,或者用包含重复值的列表创建一个集合),你的代码也会更加有序、简洁和简单。
表 8-1 汇总了一些编程的最佳实践,你可以在以后的编程中考虑这些做法。这些最佳实践并没有包含关于 Python 和编程的所有内容,但可以为今后的学习和编程打下良好的基础。
实践 | 说明 |
文档 | 包括代码中的注释、函数说明和脚本说明,以及 README.md 文件和仓库中其他必要的说明文件 |
命名清晰 | 所有函数、变量和文件都应该有清晰的命名,从名字中就可以看出其内容或功能 |
语法正确 | 变量和函数应该遵守正确的 Python 语法(一般用小写字母,单词之间加下划线,对于类名采用驼峰式大小写(CamelCase,https://en.wikipedia.org/wiki/CamelCase),代码应遵守 PEP-8 标准 |
导入 | 只导入需要使用的内容,导入方式遵守 PEP-8 的原则 |
辅助函数 | 创建抽象的辅助函数,使代码变得清晰、可复用(例如,export_to_csv 函数将列表内容导入 CSV 文件) |
仓库管理 | 用逻辑结构和层级结构管理仓库,共用的代码放在一起,符合一般的逻辑规律 |
版本控制 | 所有代码都应该有版本控制,这样你或你的同事可以创建新分支、尝试新特性,而不会影响仓库主分支的运行 |
快速,但是更要清晰 | 利用 Python 语法糖写出快速高效的代码,但当速度和清晰只能二选一时,选择清晰的代码 |
利用现成的库 | 当你想做点什么,而前人已经用 Python 做过了,不要重复造轮子。善于利用优秀的 Python 库,对这些库做贡献来帮助开源社区 |
代码测试 | 在适当可行的时候,为单个函数编写测试,并利用测试数据来测试代码 |
详实准确 | 在 try 代码块中正确地编写例外(exception),代码文档要详实,变量名要准确 |
为代码编写文档是编写脚本的一个重要步骤。正如 Eric Holscher(Python 主义者,Write the Docs 的创始人之一)恰如其分地总结(http://www.writethedocs.org/guide/writing/beginners-guide-to-docs/):为代码编写文档的原因有很多,最重要的原因就是你可能会再次用到这些代码——或者其他人可能会阅读并使用这些代码,或者你想发布到 GitHub 上,或者你想在以后的面试中用到,或者你想将代码发给你母亲。无论什么原因,为代码编写完备的文档,可以在未来减少数小时的痛苦。如果你是团队的一员,还会减少整个团队数百小时的痛苦。想到未来会有这些好处,现在值得花精力坐下来分析代码的用途,以及这么编写的原因。
类似 Read the Docs(https://readthedocs.org/)或者 Write the Docs(http://www.writethedocs.org/)之类的机构给出了许多好的建议和帮助,使编写文档变得更加轻松。一个好的经验做法是,在项目根目录里创建一个 README.md,简要说明代码的作用、安装方法和运行方法、基本要求以及在哪里可以找到更多信息。
有时在 README.md 里放一个简短的代码示例也是很有用的,这取决于用户(读者)与核心组件的交互次数多少。
除了 README.md 文件,你还需要添加代码注释。第 5 章中说过,注释可以是只给自己看的快速笔记,也可以是说明脚本和函数用法的长注释。
Python 中各种注释的语法和用法在 PEP-350(https://www.python.org/dev/peps/pep-0350/)中有详细说明。遵循这些标准,任何人都可以轻松看懂你写的注释。
· 从 UNICEF 数据文件中导入数据。
· 找到数据行对应的标题。
· 将我们可以读懂的标题与内置缩写标题正确匹配。
· 解析数据,检查是否有重复值。
· 解析数据,检查数据是否有缺失。
· 将同一家庭的多行数据合并。
· 保存数据。
from csv import reader import dataset data_rdr = reader(open('../../../data/unicef/mn.csv', 'rb')) header_rdr = reader(open('../../../data/unicef/mn_headers_updated.csv', 'rb')) data_rows = [d for d in data_rdr] header_rows = [h for h in header_rdr if h[0] in data_rows[0]] all_short_headers = [h[0] for h in header_rows] skip_index = [] final_header_rows = [] for header in data_rows[0]: if header not in all_short_headers: print header index = data_rows[0].index(header) if index not in skip_index: skip_index.append(index) else: for head in header_rows: if head[0] == header: final_header_rows.append(head) break new_data = [] for row in data_rows[1:]: new_row = [] for i, d in enumerate(row): if i not in skip_index: new_row.append(d) new_data.append(new_row) zipped_data = [] for drow in new_data: zipped_data.append(zip(final_header_rows, drow)) # 检查数据是否有缺失 for x in zipped_data[0]: if not x[1]: print x # 检查是否有重复值 set_of_keys = set([ '%s-%s-%s' % (x[0][1], x[1][1], x[2][1]) for x in zipped_data]) uniques = [x for x in zipped_data if not set_of_keys.remove('%s-%s-%s' % (x[0][1], x[1][1], x[2][1]))] print len(set_of_keys) # 保存到数据库 db = dataset.connect('sqlite:///../../data_wrangling.db') table = db['unicef_survey'] for row_num, data in enumerate(zipped_data): for question, answer in data: data_dict = { 'question': question[1], 'question_code': question[0], 'answer': answer, 'response_number': row_num, 'survey': 'mn', } table.insert(data_dict)
def get_rows(file_name): rdr = reader(open(file_name, 'rb')) return [row for row in rdr]
我们修改 header_rows 使其与 data_rows 里的标题对齐,花了不少时间,但现在已经不需要这段代码了。我们创建了 final_header_rows,里面的 header_rows 和 data_rows 已经匹配好了,所以我们无需担心二者不匹配的问题。我们可以删除这行代码。
14~27 行的作用是创建 final_header_rows 和 skip_index 两个列表。我们可以将这两个列表的用途总结一下,就是用于删除不匹配的元素,这样我们才能合并最终列表。我们把两个列表放在同一个方法中:
def eliminate_mismatches(header_rows, data_rows): all_short_headers = [h[0] for h in header_rows] skip_index = [] final_header_rows = [] for header in data_rows[0]: if header not in all_short_headers: index = data_rows[0].index(header) if index not in skip_index: skip_index.append(index) else: for head in header_rows: if head[0] == header: final_header_rows.append(head) break return skip_index, final_header_rows
def zip_data(headers, data): zipped_data = [] for drow in data: zipped_data.append(zip(headers, drow)) return zipped_data def create_zipped_data(final_header_rows, data_rows, skip_index): new_data = [] for row in data_rows[1:]: new_row = [] for index, data in enumerate(row): if index not in skip_index: new_row.append(data) new_data.append(new_row) zipped_data = zip_data(final_header_rows, new_data) return zipped_data
from csv import reader import dataset def get_rows(file_name): rdr = reader(open(file_name, 'rb')) return [row for row in rdr] def eliminate_mismatches(header_rows, data_rows): all_short_headers = [h[0] for h in header_rows] skip_index = [] final_header_rows = [] for header in data_rows[0]: if header not in all_short_headers: index = data_rows[0].index(header) if index not in skip_index: skip_index.append(index) else: for head in header_rows: if head[0] == header: final_header_rows.append(head) break return skip_index, final_header_rows def zip_data(headers, data): zipped_data = [] for drow in data: zipped_data.append(zip(headers, drow)) return zipped_data def create_zipped_data(final_header_rows, data_rows, skip_index): new_data = [] for row in data_rows[1:]: new_row = [] for index, data in enumerate(row): if index not in skip_index: new_row.append(data) new_data.append(new_row) zipped_data = zip_data(final_header_rows, new_data) return zipped_data def find_missing_data(zipped_data): missing_count = 0 for question, answer in zipped_data: if not answer: missing_count += 1 return missing_count def find_duplicate_data(zipped_data): set_of_keys = set([ '%s-%s-%s' % (row[0][1], row[1][1], row[2][1]) for row in zipped_data]) uniques = [row for row in zipped_data if not set_of_keys.remove('%s-%s-%s' % (row[0][1], row[1][1], row[2][1]))] return uniques, len(set_of_keys) def save_to_sqlitedb(db_file, zipped_data, survey_type): db = dataset.connect(db_file) table = db['unicef_survey'] all_rows = [] for row_num, data in enumerate(zipped_data): for question, answer in data: data_dict = { 'question': question[1], 'question_code': question[0], 'answer': answer, 'response_number': row_num, 'survey': survey_type, } all_rows.append(data_dict) table.insert_many(all_rows)
现在我们要在一个 main 函数中说明使用这些函数的方法。Python 开发者一般会将通过命令行运行的代码放到 main 函数里。下面我们添加 main 函数的代码,用于清洗数据集:
""" 这部分代码放在已写脚本的下面。 """ def main(): data_rows = get_rows('data/unicef/mn.csv') header_rows = get_rows('data/unicef/mn_headers_updated.csv') skip_index, final_header_rows = eliminate_mismatches(header_rows, data_rows) zipped_data = create_zipped_data(final_header_rows, data_rows, skip_index) num_missing = find_missing_data(zipped_data) uniques, num_dupes = find_duplicate_data(zipped_data) if num_missing == 0 and num_dupes == 0: save_to_sqlitedb('sqlite:///data/data_wrangling.db', zipped_data) else: error_msg = '' if num_missing: error_msg += 'We are missing {} values. '.format(num_missing) if num_dupes: error_msg += 'We have {} duplicates. '.format(num_dupes) error_msg += 'Please have a look and fix!' print error_msg if __name__ == '__main__': main()
现在我们有了一个可以从命令行运行的可执行文件。运行此文件会发生什么?你会得到我们刚刚创建的错误信息,还是将数据保存到本地的 SQLite 数据库中?
大多数可以在命令行中运行的 Python 文件都有一些相同的属性。它们一般都有一个 main 函数,里面再调用小型函数或辅助函数,和我们上面的清洗脚本类似。
main 函数一般会在文件的主缩进级别的代码块中进行调用。调用的语法是 if __name__ == '__main__':。这个语法用到了全局的私有变量(所以变量名两边才有双下划线),当你在命令行运行文件时会返回 True。
如果不是在命令行中运行脚本,那么 if 语句中的代码不会运行。如果我们将这些函数导入另一个脚本中,__name__ 变量不等于 '__main__',代码就不会运行。这是 Python 脚本常用的约定。
遇到任何错误,检查你的代码和上述代码是否完全相同,检查仓库中数据的文件路径是否正确,还要检查第 6 章创建的本地数据库的文件路径是否正确。
下面我们来为代码编写文档。我们要给函数添加一些文档字符串和行内注释,方便我们理解脚本中比较复杂的代码段,还要在脚本开头添加一大段说明文字,这些文字以后可以放到 README.md 文件中:
""" Usage: python our_cleanup_script.py This script is used to intake the male survey data from UNICEF and save it to a simple database file after it has been checked for duplicates and missing data and after the headers have been properly matched with the data. It expects there to be a 'mn.csv' file with the data and the 'mn_updated_headers.csv' file in a subfolder called 'unicef' within a data folder in this directory. It also expects there to be a SQLite file called 'data_wrangling.db' in the root of this directory. Finally, it expects to utilize the dataset library (http://dataset.readthedocs.org/en/latest/). If the script runs without finding any errors, it will save the cleaned data to the 'unicef_survey' table in the SQLite. The saved data will have the following structure: - question: string - question_code: string - answer: string - response_number: integer - survey: string The response number can later be used to join entire responses together (i.e., all of response_number 3 come from the same interview, etc.). If you have any questions, please feel free to contact me via ... """ from csv import reader import dataset def get_rows(file_name): """Return a list of rows from a given csv filename.""" rdr = reader(open(file_name, 'rb')) return [row for row in rdr] def eliminate_mismatches(header_rows, data_rows): """ Return index numbers to skip in a list and final header rows in a list when given header rows and data rows from a UNICEF dataset. This function assumes the data_rows object has headers in the first element. It assumes those headers are the shortened UNICEF form. It also assumes the first element of each header row in the header data is the shortened UNICEF form. It will return the list of indexes to skip in the data rows (ones that don't match properly with headers) as the first element and will return the final cleaned header rows as the second element. """ all_short_headers = [h[0] for h in header_rows] skip_index = [] final_header_rows = [] for header in data_rows[0]: if header not in all_short_headers: index = data_rows[0].index(header) if index not in skip_index: skip_index.append(index) else: for head in header_rows: if head[0] == header: final_header_rows.append(head) break return skip_index, final_header_rows def zip_data(headers, data): """ Return a list of zipped data when given a header list and data list. Assumes the length of data elements per row and the length of headers are the same. example output: [(['question code', 'question summary', 'question text'], 'resp'), ....] """ zipped_data = [] for drow in data: zipped_data.append(zip(headers, drow)) return zipped_data def create_zipped_data(final_header_rows, data_rows, skip_index): """ Returns a list of zipped data rows (matching header and data) when given a list of final header rows, a list of data rows, and a list of indexes on those data rows to skip as they don't match properly. The function assumes the first row in the data rows contains the original data header values, and will remove those values from the final list. """ new_data = [] for row in data_rows[1:]: new_row = [] for index, data in enumerate(row): if index not in skip_index: new_row.append(data) new_data.append(new_row) zipped_data = zip_data(final_header_rows, new_data) return zipped_data def find_missing_data(zipped_data): """ Returns a count of how many answers are missing in an entire set of zipped data. This function assumes all responses are stored as the second element. It also assumes every response is stored in a list of these matched question, answer groupings. It returns an integer. """ missing_count = 0 for response in zipped_data: for question, answer in response: if not answer: missing_count += 1 return missing_count def find_duplicate_data(zipped_data): """ Returns a list of unique elements and a number of duplicates found when given a UNICEF zipped_data list. This function assumes that the first three rows of data are structured to have the house, cluster, and line number of the interview and uses these values to create a unique key that should not be repeated. """ set_of_keys = set([ '%s-%s-%s' % (row[0][1], row[1][1], row[2][1]) for row in zipped_data]) #TODO: this will throw an error if we have duplicates- we should find a way #around this uniques = [row for row in zipped_data if not set_of_keys.remove('%s-%s-%s' % (row[0][1], row[1][1], row[2][1]))] return uniques, len(set_of_keys) def save_to_sqlitedb(db_file, zipped_data, survey_type): """ When given a path to a SQLite file, the cleaned zipped_data, and the UNICEF survey type that was used, saves the data to SQLite in a table called 'unicef_survey' with the following attributes: question, question_code, answer, response_number, survey """ db = dataset.connect(db_file) table = db['unicef_survey'] all_rows = [] for row_num, data in enumerate(zipped_data): for question, answer in data: data_dict = { 'question': question[1], 'question_code': question[0], 'answer': answer, 'response_number': row_num, 'survey': survey_type, } all_rows.append(data_dict) table.insert_many(all_rows) def main(): """ Import all data into rows, clean it, and then if no errors are found, save it to SQlite. If there are errors found, print out details so developers can begin work on fixing the script or seeing if there is an error in the data. """ #TODO: we probably should abstract these files so that we can pass # them in as variables and use the main function with other surveys data_rows = get_rows('data/unicef/mn.csv') header_rows = get_rows('data/unicef/mn_updated_headers.csv') skip_index, final_header_rows = eliminate_mismatches(header_rows, data_rows) zipped_data = create_zipped_data(final_header_rows, data_rows, skip_index) num_missing = find_missing_data(zipped_data) uniques, num_dupes = find_duplicate_data(zipped_data) if num_missing == 0 and num_dupes == 0: #TODO: we probably also want to abstract this # file away, or make sure it exists before continuing save_to_sqlite('sqlite:///data_wrangling.db', zipped_data, 'mn') else: #TODO: eventually we probably want to log this, and # maybe send an email if an error is thrown rather than print it error_msg = '' if num_missing: error_msg += 'We are missing {} values. '.format(num_missing) if num_dupes: error_msg += 'We have {} duplicates. '.format(num_dupes) error_msg += 'Please have a look and fix!' print error_msg if __name__ == '__main__': main()
现在我们的代码文档更详细、结构更合理,还有许多可复用的函数。对于我们的第一个脚本来说,这是一个很好的开始。利用这些代码,希望我们可以导入许多 UNICEF 数据!
我们只用了一个文件来运行代码。但随着代码量的增加,你的仓库也会变得越来越复杂。在初期就要思考你可能需要向仓库中添加的内容,这一点是很重要的。代码和代码结构很相似。如果你认为这个仓库可能的用途不仅仅是解析 UNICEF 数据,你的代码结构可能会大不相同。
在初期不必过分担心这些决策。随着你 Python 编程水平的提高和对数据集的理解进一步加深,你会更清楚地认识到应该从哪里开始。
在仓库的结构中,经常会有一个名为 utils 或 common 的文件夹,你可以在里面保存代码之间共享的脚本。许多开发者将数据库连接脚本,常用的 API 代码和通信或 email 脚本等保存在这样的文件夹中,方便导入其他脚本中。
你可能创建了多个目录来保存项目的不同内容,具体取决于仓库的管理结构。其中一个目录只和 UNICEF 数据有关。另一个目录可能包含网络抓取脚本或最终报告代码。如何组织仓库的结构由你自己决定。永远保持清晰、明确、有序。
如果你最后不得不重新组织仓库结构,那么在开始时就尽可能保持仓库有序,后面就不会太过痛苦。相反,如果你的仓库里都是 800 行的文件,而且没有清晰的文档,那么你要做的事情就很多了。最好的经验做法是最开始给出结构框架,随着仓库内容的增加和变化对结构进行临时调整。
除了良好的文件结构,保持目录、文件、函数和类的命名清晰明确也是很有用的。在 utils 文件夹中可能有多个文件。如果你将其命名为 utils1、utils2 等,你可能需要打开文件才能知道它们的具体内容。但如果你将其命名为 email.py、database.py、twitter_api.py 等,文件名本身就包含了更多信息。
在代码中尽量保持明确,对长期而成功的 Python 数据处理事业是一个良好的开端。我们思考一下仓库的结构,看如何找到相应的文件:
data_wrangling_repo/ |-- README.md |-- data_wrangling.db |-- data/ | `-- unicef/ | |-- mn.csv | |-- mn_updated_headers.csv | |-- wm.csv | `-- wm_headers.csv |-- scripts/ | `-- unicef/ | `-- unicef_cleanup.py (本章的脚本) `-- utils/ |-- databases.py `-- emailer.py
我们还没有编写 databases 或 emailer 文件,但我们或许应该这么做。我们还可以向文件结构中添加哪些内容?我们在仓库中创建了两个不同的 unicef 文件夹,你认为这么做的原因是什么?开发者是否应该将数据文件和脚本文件分开保存?
我们也建议不要将 db 文件或任何 log、config 文件提交到仓库中。仓库结构应尽可能实用。你总是可以将预期的文件结构添加到 README.md 文件中,并详细说明去哪里获取数据文件。
Git 和 .gitignore 文件
如果你还没有用 Git(https://git-scm.com/)做版本控制的话,学完本书就会用了!版本控制可以让你创建仓库来管理和修改代码,并将其分享给团队或其他同事。
在第 14 章中我们将会深入讲解Git,但现在我们在讨论仓库结构,希望重点说一下 .gitignore 文件(https://github.com/github/gitignore)。.gitignore 文件的作用是,让 Git 忽略某些文件,不要将这些文件上传到仓库中。这个文件使用简单模式来匹配文件名,与我们在第 7 章中学过的正则表达式类似。
在我们的仓库结构中,我们可以用一个 .gitignore 文件,这样 Git 就不会将任何数据文件提交到仓库中。然后我们可以在 README.md 中说明仓库的结构,给出获取数据文件的联系信息。这样我们的仓库就比较简洁,且易于下载,还可以保持良好的代码结构。
创建一个符合逻辑的仓库结构,并添加 README.md 和 .gitignore 文件,可以保持模块化代码的项目文件夹有序,并避免将大型数据文件或可能敏感的数据(数据库或登录数据)放在仓库中。
