手写一个 JSON 反序列化程序
本篇文章的目的是学习实践,所以我们选择相对简单的 Python 实现,原因在于 JSON 的值类型可以很方便的映射到 Python 的数据类型。下面是二者之间的映射关系:
——————————————————————— JSON Python ——————————————————————— null None true True false False number float string str array list object dict
实现整个反序列化的功能大概需要 200 多行的代码,项目虽小,但是需要考虑的场景还是挺多的,为了提高代码质量,增加了单元测试用例,这样就可以很方便的定位问题。
最终,加上单元测试,整个项目的代码行数在 400 行以内。
入口函数
首先定义反序列化的入口函数:它接受一个字符串,解析并映射成 Python 类型。
def parse(text):
pass
考虑到我们可能需要从前到后挨个读取 text 里面的字符,特殊情况下甚至需要预读后续的若干位字符,为了方便,定义一个类用来保存这些内部状态。
WHITE_SPACES = {"\n", "\t", "\r", " "}
class TextObj:
def __init__(self, text):
self.text = text
self.index = 0 # 保存读取的位置信息
self.line = 1 # 记录行数,在遇到错误方便定位
def read(self):
# 读取下一个字符
self.index += 1
# 判断是否已经读取完毕
if self.index >= len(self.text):
return ""
char = self.text[self.index]
if char == '\n':
self.line += 1
return char
def skip(self):
# 对于解析函数来说,调用 skip 可方便的跳过空白字符
if self.current in WHITE_SPACES:
char = self.read()
while char and char in WHITE_SPACES:
char = self.read()
def read_slice(self, step):
# 方便一次性读取多个字符
start = self.index
end = start + step
self.index = end
if end > len(self.text):
raise UnexpectedCharacterError(self)
return self.slice(start, end)
def slice(self, start, end):
# 跟 read_slice 区别在于,slice 不消耗下标
return self.text[start:end]
@property
def current(self):
# 使用描述符来动态获取当前字符
if self.index >= len(self.text):
return ""
return self.text[self.index]
另外,对于不合法的 text 输入,程序应该能够抛出异常,所以我们定义了 UnexpectedCharacterError :
class UnexpectedCharacterError(ValueError):
def __init__(self, text_obj):
super().__init__("unexpected character at index %s(line %s): %s" % (text_obj.index, text_obj.line, text_obj.text))
有了 TextObj 和 UnexpectedCharacterError,假设我们还有一个函数 parse_value 可以正确解析 JSON 值类型,到这里基本上就可以给出完整的 parse 函数了:
def parse(text):
text_obj = TextObj(text)
text_obj.skip() # 跳过开始的空白符
result = parse_value(text_obj)
text_obj.skip() # 跳过结束的空白符
if text_obj.current != "":
raise UnexpectedCharacterError(text_obj)
return result
剩下的编码工作就是完成 parse_value 函数了,但是在定义 parse_value 之前,我们先来编写单元测试用例。
单元测试
测试模块使用 Python 内置的 unittest,测试用例需要覆盖各种 JSON 值类型的解析,另外还需要覆盖异常情况。
class ZjsonTest(unittest.TestCase): def test_parse_null(self): self.assertIsNone(zjson.parse("null")) with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("null0") def test_parse_false(self): self.assertFalse(zjson.parse("false")) with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("flase") def test_parse_true(self): self.assertTrue(zjson.parse("true")) with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("ture") def test_parse_number(self): self.assertEqual(zjson.parse("-1"), -1.0) self.assertEqual(zjson.parse("1"), 1.0) self.assertEqual(zjson.parse("0"), 0.0) self.assertEqual(zjson.parse("-0"), 0.0) self.assertEqual(zjson.parse("1.1"), 1.1) self.assertEqual(zjson.parse("1.10"), 1.1) self.assertEqual(zjson.parse("1E1"), 10.0) self.assertEqual(zjson.parse("1E-1"), 0.1) self.assertEqual(zjson.parse("1E0"), 1.0) self.assertEqual(zjson.parse("1E-0"), 1.0) self.assertEqual(zjson.parse("1.1E0"), 1.1) self.assertEqual(zjson.parse("-1.1E1"), -11.0) with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("00") with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("0..0") with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("0.E0") with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("0.") with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("-") with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("+0") with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("+1") with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse(".23") def test_parse_string(self): self.assertEqual(zjson.parse('"hello"'), "hello") self.assertEqual(zjson.parse('"1111"'), "1111") self.assertEqual(zjson.parse('"1111\\""'), "1111\"") self.assertEqual(zjson.parse('"1111\\n"'), "1111\n") self.assertEqual(zjson.parse('"1111\\r"'), "1111\r") self.assertEqual(zjson.parse('"1111 "'), "1111 ") self.assertEqual(zjson.parse('" 1111 "'), " 1111 ") self.assertEqual(zjson.parse('"\\\\"'), "\\") self.assertEqual(zjson.parse('"\\/"'), "/") self.assertEqual(zjson.parse('""'), "") self.assertEqual(zjson.parse('"\\u6c49"'), "汉") self.assertEqual(zjson.parse('"\\uD834\\uDD1E"'), "") with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("\"") with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("\"111") with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("111\"") with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse('"\\uxxxx"') with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse('"\\uD801\\ux"') def test_parse_array(self): self.assertEqual(zjson.parse('["hello"]'), ["hello"]) self.assertEqual(zjson.parse('[1]'), [1]) self.assertEqual(zjson.parse('[null]'), [None]) self.assertEqual(zjson.parse('[1,2,3]'), [1,2,3]) self.assertEqual(zjson.parse('[1,2,"hello"]'), [1,2,"hello"]) self.assertEqual(zjson.parse('[true, false]'), [True, False]) self.assertEqual(zjson.parse('[[1,2], [3,4]]'), [[1,2], [3,4]]) self.assertEqual(zjson.parse('[{"hello": "world"}]'), [{"hello": "world"}]) with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse('[1') zjson.parse('[1, 2') zjson.parse('[1, 2}') def test_parse_object(self): self.assertEqual(zjson.parse(' {"hello": 1} '), {"hello": 1}) self.assertEqual(zjson.parse('\t{"hello": "world"}\n'), {"hello": "world"}) self.assertEqual(zjson.parse('{"hello": "world", "k2": "v2"}'), {"hello": "world", "k2": "v2"}) self.assertEqual(zjson.parse('{"hello": [1,2]}'), {"hello": [1,2]}) self.assertEqual(zjson.parse('{"hello": {"k1":"v1", "k2": "v2"}}'), {"hello": {"k1":"v1", "k2": "v2"}}) with self.assertRaises(zjson.UnexpectedCharacterError): zjson.parse("{1:2}") zjson.parse('{"1":2') zjson.parse('{"1":2')
整个测试用例大概 100 行左右,基本上能够覆盖各种值类型的解析场景。有了测试用例,接下来就可以放心大胆的编写功能代码了。
值类型解析
现在到了正式定义 parse_value 的时候了,考虑到 JSON 的几种值类型的开始标志全都不一样,只需要读取 text_obj 的当前字符就可以通过分支调用不用值类型的解析函数,所以 parse_value 相对来说也算简单。
def parse_value(text_obj): char = text_obj.current if char == "n": return parse_null(text_obj) elif char == "f": return parse_false(text_obj) elif char == "t": return parse_true(text_obj) elif char == '"': return parse_string(text_obj) elif char == "{": return parse_object(text_obj) elif char == "[": return parse_array(text_obj) else: return parse_number(text_obj)
剩下的全部工作就是定义不同类型的解析函数了。
I. parse_null
最简单的还是 parse_null,只需要再预读剩下的三个字符,判断是不是跟 “null” 相等即可,如果不相等意味着原始 text 不合法,需要抛出异常。
def parse_null(text_obj): if text_obj.read_slice(4) != "null": raise UnexpectedCharacterError(text_obj) return None
II. parse_true parse_false
同 parse_null 类似,下面是 parse_true 和 parse_false 的函数定义:
def parse_false(text_obj): if text_obj.read_slice(5) != "false": raise UnexpectedCharacterError(text_obj) return False def parse_true(text_obj): if text_obj.read_slice(4) != "true": raise UnexpectedCharacterError(text_obj) return True
III. parse_number
虽然我们可以使用 float 直接转换 text 中的 number,但是由于 number 的格式与 Python 存在一些差别,还是需要 parse_number 判断 number 是否合法。
DIGITS = set("0123456789") def parse_number(text_obj): head = text_obj.index # 记录开始时的下标 char = text_obj.current # 处理负号 if char == "-": char = text_obj.read() # 整数部分有两种形式:0 或者 1-9 后面跟若干数字 if char == "0": char = text_obj.read() elif char in DIGITS: while char in DIGITS: char = text_obj.read() else: # 如果整数部分不合法,则整个 number 不合法 raise UnexpectedCharacterError(text_obj) # 小数部分 if char == ".": char = text_obj.read() if char not in DIGITS: raise UnexpectedCharacterError(text_obj) while char in DIGITS: char = text_obj.read() # 指数部分 if char == "E" or char == "e": char = text_obj.read() if char == "+" or char == "-": char = text_obj.read() if char not in DIGITS: raise UnexpectedCharacterError(text_obj) while char in DIGITS: char = text_obj.read() tail = text_obj.index # 记录结束时的下标 # 使用内置的 float 将字符串转化为浮点数 return float(text_obj.slice(head, tail))
对于 number 的解析结果使用 float 映射主要还是为了方便,如果想区分浮点数和整数,也可以加入额外的判断,根据 number 的实际值决定使用哪个类型映射:整数使用 int,浮点数使用 float。
IV. parse_string
parse_string 也是比较复杂的解析函数了,对于转义字符特别是 Unicode 需要十分小心。
ESCAPES = { '"': '"', "\\": "\\", "/": "/", "b": "\b", "f": "\f", "n": "\n", "r": "\r", "t": "\t", } def parse_string(text_obj): if text_obj.current != '"': # 必须以双引号开始 raise UnexpectedCharacterError(text_obj) # 考虑到转义字符的存在,这里需要使用 list 来保存解析到的单个字符,最后使用 join 函数返回字符串 # 避免使用字符串的”+“操作,可以提高效率 cs = [] text_obj.read() while True: char = text_obj.current if char == "": # 判断 text 是否已经小号完毕 raise UnexpectedCharacterError(text_obj) if char == "\\": # 处理转义字符 char = text_obj.read() if char == "u": # 处理 Unicode text_obj.read() code_point = get_code_point(text_obj) if 0xD800 <= code_point <= 0xDBFF and text_obj.slice(text_obj.index, text_obj.index+2) == "\\u": # 处理超过 0xFFFF 的码点 text_obj.read_slice(2) low = get_code_point(text_obj) code_point = 0x10000 + (code_point - 0xD800) * 0x400 + (low - 0xDC00) cs.append(chr(code_point)) continue if char not in ESCAPES: raise UnexpectedCharacterError(text_obj) cs.append(ESCAPES[char]) text_obj.read() continue elif char == '"': # 结束标志 text_obj.read() return "".join(cs) else: # 普通字符,直接添加到 list cs.append(char) text_obj.read() def get_code_point(text_obj): # 解析 unicode 时使用 h4 = text_obj.read_slice(4) try: return int(h4, 16) except Exception as e: raise UnexpectedCharacterError(text_obj)
完成了基本类型的解析,剩下的就是集合类型了。由于集合类型具有闭包性质,所以集合类型的解析又调用了 parse_value 函数。
V. parse_array
Python 中的 list 类型跟 array 非常相似,映射起来也比较方便,parse_array 的主要的工作还是验证 array 格式是否合法。
def parse_array(text_obj): assert text_obj.current == "[" result = [] # 使用 list 来保存 array 解析结果 text_obj.read() text_obj.skip() char = text_obj.current while True: if char == "]": # 结束标志。主要是为了判断是否为空 array text_obj.read() break value = parse_value(text_obj) result.append(value) text_obj.skip() char = text_obj.current if char == ",": # 判断是否还有值 text_obj.read() text_obj.skip() continue elif char == "]": # 结束标志 text_obj.read() break else: raise UnexpectedCharacterError(text_obj) return result
VI. parse_object
最后完成的是 parse_object 的函数定义。
def parse_object(text_obj): assert text_obj.current == "{" result = {} # 使用 dict 保存 object 解析结果 text_obj.read() text_obj.skip() char = text_obj.current while True: if char == "}": # 结束标志。主要是为了判断是否为空 object text_obj.read() break # 解析 key key = parse_string(text_obj) text_obj.skip() # 解析冒号 if text_obj.current != ":": raise UnexpectedCharacterError(text_obj) text_obj.read() text_obj.skip() # 解析 value value = parse_value(text_obj) text_obj.skip() # 添加到 dict result[key] = value char = text_obj.current if char == ",": # 判断是否还有键值对 text_obj.read() text_obj.skip() continue elif char == "}": # 结束标志 text_obj.read() break else: raise UnexpectedCharacterError(text_obj) return result
RUN
至此,整个反序列化功能就全部完成了,整个程序没有引入第三方依赖,完全由 Python 的内置功能完成。最后,运行一下单元测试:
Ran 7 tests in 0.010s OK
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论