手写一个 JSON 反序列化程序

发布于 2023-07-21 12:34:29 字数 13033 浏览 35 评论 0

本篇文章的目的是学习实践,所以我们选择相对简单的 Python 实现,原因在于 JSON 的值类型可以很方便的映射到 Python 的数据类型。下面是二者之间的映射关系:

———————————————————————
JSON             Python
———————————————————————
null             None
true             True
false            False
number           float
string           str
array            list
object           dict

实现整个反序列化的功能大概需要 200 多行的代码,项目虽小,但是需要考虑的场景还是挺多的,为了提高代码质量,增加了单元测试用例,这样就可以很方便的定位问题。

最终,加上单元测试,整个项目的代码行数在 400 行以内。

入口函数

首先定义反序列化的入口函数:它接受一个字符串,解析并映射成 Python 类型。

def parse(text):
  pass​

考虑到我们可能需要从前到后挨个读取 text 里面的字符,特殊情况下甚至需要预读后续的若干位字符,为了方便,定义一个类用来保存这些内部状态。

WHITE_SPACES = {"\n", "\t", "\r", " "}
class TextObj:
  def __init__(self, text):
    self.text = text
    self.index = 0  # 保存读取的位置信息
    self.line = 1 # 记录行数,在遇到错误方便定位
  def read(self):
    # 读取下一个字符
    self.index += 1
    # 判断是否已经读取完毕
    if self.index >= len(self.text):
      return ""
    char = self.text[self.index]
    if char == '\n':
      self.line += 1
    return char
  def skip(self):
    # 对于解析函数来说,调用 skip 可方便的跳过空白字符
    if self.current in WHITE_SPACES:
      char = self.read()
      while char and char in WHITE_SPACES:
        char = self.read()
  def read_slice(self, step):
    # 方便一次性读取多个字符
    start = self.index
    end = start + step
    self.index = end
    if end > len(self.text):
      raise UnexpectedCharacterError(self)
    return self.slice(start, end)
  def slice(self, start, end):
    # 跟 read_slice 区别在于,slice 不消耗下标
    return self.text[start:end]
  @property
  def current(self):
    # 使用描述符来动态获取当前字符
    if self.index >= len(self.text):
      return ""
    return self.text[self.index]​

另外,对于不合法的 text 输入,程序应该能够抛出异常,所以我们定义了 UnexpectedCharacterError :

class UnexpectedCharacterError(ValueError):
  def __init__(self, text_obj):
    super().__init__("unexpected character at index %s(line %s): %s" % (text_obj.index, text_obj.line, text_obj.text))​

有了 TextObj 和 UnexpectedCharacterError,假设我们还有一个函数 parse_value 可以正确解析 JSON 值类型,到这里基本上就可以给出完整的 parse 函数了:

def parse(text):
  text_obj = TextObj(text)
  text_obj.skip()  # 跳过开始的空白符
  result = parse_value(text_obj)
  text_obj.skip()  # 跳过结束的空白符
  if text_obj.current != "":
    raise UnexpectedCharacterError(text_obj)
  return result​

剩下的编码工作就是完成 parse_value 函数了,但是在定义 parse_value 之前,我们先来编写单元测试用例。

单元测试

测试模块使用 Python 内置的 unittest,测试用例需要覆盖各种 JSON 值类型的解析,另外还需要覆盖异常情况。

class ZjsonTest(unittest.TestCase):
  def test_parse_null(self):
    self.assertIsNone(zjson.parse("null"))
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("null0")
  def test_parse_false(self):
    self.assertFalse(zjson.parse("false"))
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("flase")
  def test_parse_true(self):
    self.assertTrue(zjson.parse("true"))
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("ture")
  def test_parse_number(self):
    self.assertEqual(zjson.parse("-1"), -1.0)
    self.assertEqual(zjson.parse("1"), 1.0)
    self.assertEqual(zjson.parse("0"), 0.0)
    self.assertEqual(zjson.parse("-0"), 0.0)
    self.assertEqual(zjson.parse("1.1"), 1.1)
    self.assertEqual(zjson.parse("1.10"), 1.1)
    self.assertEqual(zjson.parse("1E1"), 10.0)
    self.assertEqual(zjson.parse("1E-1"), 0.1)
    self.assertEqual(zjson.parse("1E0"), 1.0)
    self.assertEqual(zjson.parse("1E-0"), 1.0)
    self.assertEqual(zjson.parse("1.1E0"), 1.1)
    self.assertEqual(zjson.parse("-1.1E1"), -11.0)
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("00")
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("0..0")
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("0.E0")
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("0.")
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("-")
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("+0")
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("+1")
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse(".23")
  def test_parse_string(self):
    self.assertEqual(zjson.parse('"hello"'), "hello")
    self.assertEqual(zjson.parse('"1111"'), "1111")
    self.assertEqual(zjson.parse('"1111\\""'), "1111\"")
    self.assertEqual(zjson.parse('"1111\\n"'), "1111\n")
    self.assertEqual(zjson.parse('"1111\\r"'), "1111\r")
    self.assertEqual(zjson.parse('"1111   "'), "1111   ")
    self.assertEqual(zjson.parse('"  1111   "'), "  1111   ")
    self.assertEqual(zjson.parse('"\\\\"'), "\\")
    self.assertEqual(zjson.parse('"\\/"'), "/")
    self.assertEqual(zjson.parse('""'), "")
    self.assertEqual(zjson.parse('"\\u6c49"'), "汉")
    self.assertEqual(zjson.parse('"\\uD834\\uDD1E"'), "")
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("\"")
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("\"111")
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("111\"")
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse('"\\uxxxx"')
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse('"\\uD801\\ux"')
  def test_parse_array(self):
    self.assertEqual(zjson.parse('["hello"]'), ["hello"])
    self.assertEqual(zjson.parse('[1]'), [1])
    self.assertEqual(zjson.parse('[null]'), [None])
    self.assertEqual(zjson.parse('[1,2,3]'), [1,2,3])
    self.assertEqual(zjson.parse('[1,2,"hello"]'), [1,2,"hello"])
    self.assertEqual(zjson.parse('[true, false]'), [True, False])
    self.assertEqual(zjson.parse('[[1,2], [3,4]]'), [[1,2], [3,4]])
    self.assertEqual(zjson.parse('[{"hello": "world"}]'), [{"hello": "world"}])
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse('[1')
      zjson.parse('[1, 2')
      zjson.parse('[1, 2}')
  def test_parse_object(self):
    self.assertEqual(zjson.parse(' {"hello": 1} '), {"hello": 1})
    self.assertEqual(zjson.parse('\t{"hello": "world"}\n'), {"hello": "world"})
    self.assertEqual(zjson.parse('{"hello": "world", "k2": "v2"}'), {"hello": "world", "k2": "v2"})
    self.assertEqual(zjson.parse('{"hello": [1,2]}'), {"hello": [1,2]})
    self.assertEqual(zjson.parse('{"hello": {"k1":"v1", "k2": "v2"}}'), {"hello": {"k1":"v1", "k2": "v2"}})
    with self.assertRaises(zjson.UnexpectedCharacterError):
      zjson.parse("{1:2}")
      zjson.parse('{"1":2')
      zjson.parse('{"1":2')

整个测试用例大概 100 行左右,基本上能够覆盖各种值类型的解析场景。有了测试用例,接下来就可以放心大胆的编写功能代码了。

值类型解析

现在到了正式定义 parse_value 的时候了,考虑到 JSON 的几种值类型的开始标志全都不一样,只需要读取 text_obj 的当前字符就可以通过分支调用不用值类型的解析函数,所以 parse_value 相对来说也算简单。

def parse_value(text_obj):
  char = text_obj.current
  if char == "n":
    return parse_null(text_obj)
  elif char == "f":
    return parse_false(text_obj)
  elif char == "t":
    return parse_true(text_obj)
  elif char == '"':
    return parse_string(text_obj)
  elif char == "{":
    return parse_object(text_obj)
  elif char == "[":
    return parse_array(text_obj)
  else:
    return parse_number(text_obj)

剩下的全部工作就是定义不同类型的解析函数了。

I. parse_null

最简单的还是 parse_null,只需要再预读剩下的三个字符,判断是不是跟 “null” 相等即可,如果不相等意味着原始 text 不合法,需要抛出异常。

def parse_null(text_obj):
  if text_obj.read_slice(4) != "null":
    raise UnexpectedCharacterError(text_obj)
  return None

II. parse_true parse_false

同 parse_null 类似,下面是 parse_true 和 parse_false 的函数定义:

def parse_false(text_obj):
  if text_obj.read_slice(5) != "false":
    raise UnexpectedCharacterError(text_obj)
  return False
def parse_true(text_obj):
  if text_obj.read_slice(4) != "true":
    raise UnexpectedCharacterError(text_obj)
  return True

III. parse_number

虽然我们可以使用 float 直接转换 text 中的 number,但是由于 number 的格式与 Python 存在一些差别,还是需要 parse_number 判断 number 是否合法。

DIGITS = set("0123456789")
def parse_number(text_obj):
  head = text_obj.index  # 记录开始时的下标
  char = text_obj.current
  # 处理负号
  if char == "-":
    char = text_obj.read()
  # 整数部分有两种形式:0 或者 1-9 后面跟若干数字
  if char == "0":
    char = text_obj.read()
  elif char in DIGITS:
    while char in DIGITS:
      char = text_obj.read()
  else:
    # 如果整数部分不合法,则整个 number 不合法
    raise UnexpectedCharacterError(text_obj)
  # 小数部分
  if char == ".":
    char = text_obj.read()
    if char not in DIGITS:
      raise UnexpectedCharacterError(text_obj)
    while char in DIGITS:
      char = text_obj.read()
  # 指数部分
  if char == "E" or char == "e":
    char = text_obj.read()
    if char == "+" or char == "-":
      char = text_obj.read()
    if char not in DIGITS:
      raise UnexpectedCharacterError(text_obj)
    while char in DIGITS:
      char = text_obj.read()
  tail = text_obj.index  # 记录结束时的下标
  # 使用内置的 float 将字符串转化为浮点数
  return float(text_obj.slice(head, tail))

对于 number 的解析结果使用 float 映射主要还是为了方便,如果想区分浮点数和整数,也可以加入额外的判断,根据 number 的实际值决定使用哪个类型映射:整数使用 int,浮点数使用 float。

IV. parse_string

parse_string 也是比较复杂的解析函数了,对于转义字符特别是 Unicode 需要十分小心。

ESCAPES = {
  '"': '"',
  "\\": "\\",
  "/": "/",
  "b": "\b",
  "f": "\f",
  "n": "\n",
  "r": "\r",
  "t": "\t",
}
def parse_string(text_obj):
  if text_obj.current != '"':
    # 必须以双引号开始
    raise UnexpectedCharacterError(text_obj)
  # 考虑到转义字符的存在,这里需要使用 list 来保存解析到的单个字符,最后使用 join 函数返回字符串
  # 避免使用字符串的”+“操作,可以提高效率
  cs = []
  text_obj.read()
  while True:
    char = text_obj.current
    if char == "":
      # 判断 text 是否已经小号完毕
      raise UnexpectedCharacterError(text_obj)
    if char == "\\":
      # 处理转义字符
      char = text_obj.read()
      if char == "u":
        # 处理 Unicode
        text_obj.read()
        code_point = get_code_point(text_obj)
        if 0xD800 <= code_point <= 0xDBFF and text_obj.slice(text_obj.index, text_obj.index+2) == "\\u":
          # 处理超过 0xFFFF 的码点
          text_obj.read_slice(2)
          low = get_code_point(text_obj)
          code_point = 0x10000 + (code_point - 0xD800) * 0x400 + (low - 0xDC00)
        cs.append(chr(code_point))
        continue
      if char not in ESCAPES:
        raise UnexpectedCharacterError(text_obj)
      cs.append(ESCAPES[char])
      text_obj.read()
      continue
    elif char == '"':
      # 结束标志
      text_obj.read()
      return "".join(cs)
    else:
      # 普通字符,直接添加到 list
      cs.append(char)
      text_obj.read()
def get_code_point(text_obj):
  # 解析 unicode 时使用
  h4 = text_obj.read_slice(4)
  try:
    return int(h4, 16)
  except Exception as e:
    raise UnexpectedCharacterError(text_obj)

完成了基本类型的解析,剩下的就是集合类型了。由于集合类型具有闭包性质,所以集合类型的解析又调用了 parse_value 函数。

V. parse_array

Python 中的 list 类型跟 array 非常相似,映射起来也比较方便,parse_array 的主要的工作还是验证 array 格式是否合法。

def parse_array(text_obj):
  assert text_obj.current == "["
  result = []  # 使用 list 来保存 array 解析结果
  text_obj.read()
  text_obj.skip()
  char = text_obj.current
  while True:
    if char == "]":
      # 结束标志。主要是为了判断是否为空 array
      text_obj.read()
      break
    value = parse_value(text_obj)
    result.append(value)
    text_obj.skip()
    char = text_obj.current
    if char == ",":
      # 判断是否还有值
      text_obj.read()
      text_obj.skip()
      continue
    elif char == "]":
      # 结束标志
      text_obj.read()
      break
    else:
      raise UnexpectedCharacterError(text_obj)
  return result

VI. parse_object

最后完成的是 parse_object 的函数定义。

def parse_object(text_obj):
  assert text_obj.current == "{"
  result = {}  # 使用 dict 保存 object 解析结果
  text_obj.read()
  text_obj.skip()
  char = text_obj.current
  while True:
    if char == "}":
      # 结束标志。主要是为了判断是否为空 object
      text_obj.read()
      break
    # 解析 key
    key = parse_string(text_obj)
    text_obj.skip()
    # 解析冒号
    if text_obj.current != ":":
      raise UnexpectedCharacterError(text_obj)
    text_obj.read()
    text_obj.skip()
    # 解析 value
    value = parse_value(text_obj)
    text_obj.skip()
    # 添加到 dict
    result[key] = value
    char = text_obj.current
    if char == ",":
      # 判断是否还有键值对
      text_obj.read()
      text_obj.skip()
      continue
    elif char == "}":
      # 结束标志
      text_obj.read()
      break
    else:
      raise UnexpectedCharacterError(text_obj)
  return result

RUN

至此,整个反序列化功能就全部完成了,整个程序没有引入第三方依赖,完全由 Python 的内置功能完成。最后,运行一下单元测试:

Ran 7 tests in 0.010s
OK

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

雪化雨蝶

暂无简介

0 文章
0 评论
23 人气
更多

推荐作者

13886483628

文章 0 评论 0

流年已逝

文章 0 评论 0

℡寂寞咖啡

文章 0 评论 0

笑看君怀她人

文章 0 评论 0

wkeithbarry

文章 0 评论 0

素手挽清风

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文