返回介绍

A.4 第14章:转换数据库的 isis2json.py 脚本

发布于 2024-02-05 21:59:46 字数 8997 浏览 0 评论 0 收藏 0

示例 A-5 是 14.13 节讨论的 isis2json.py 脚本。这个脚本使用生成器函数,以惰性的方式把 CDS/ISIS 数据库转换成 JSON 格式,以便载入到 CouchDB 或 MongoDB。

注意,这是个 Python 2 脚本,针对 CPython 或 Jython,支持 Python 2.5~2.7,不能使用 Python 3 运行。在 CPython 中,只能读取 .iso 文件;在 Jython 中,使用 GitHub 中 fluentpython/isis2json 仓库里的 Bruma 库,还可以读取 .mst 文件。详情参见该仓库里的用法文档。

示例 A-5 isis2json.py:依赖和文档在 GitHub 中的 fluentpython/isis2json 仓库里

# 这个脚本支持Python和Jython(版本>=2.5且<3)

import sys
import argparse
from uuid import uuid4
import os

try:
  import json
except ImportError:
  if os.name == 'java':  # 在Jython中运行
    from com.xhaus.jyson import JysonCodec as json
  else:
    import simplejson as json

SKIP_INACTIVE = True
DEFAULT_QTY = 2**31
ISIS_MFN_KEY = 'mfn'
ISIS_ACTIVE_KEY = 'active'
SUBFIELD_DELIMITER = '^'
INPUT_ENCODING = 'cp1252'


def iter_iso_records(iso_file_name, isis_json_type):  ➊
  from iso2709 import IsoFile
  from subfield import expand

  iso = IsoFile(iso_file_name)
  for record in iso:
    fields = {}
      for field in record.directory:
      field_key = str(int(field.tag)) # 删除前导零
      field_occurrences = fields.setdefault(field_key, [])
      content = field.value.decode(INPUT_ENCODING, 'replace')
      if isis_json_type == 1:
        field_occurrences.append(content)
      elif isis_json_type == 2:
        field_occurrences.append(expand(content))
      elif isis_json_type == 3:
        field_occurrences.append(dict(expand(content)))
      else:
        raise NotImplementedError('ISIS-JSON type %s conversion '
          'not yet implemented for .iso input' % isis_json_type)

    yield fields
  iso.close()


def iter_mst_records(master_file_name, isis_json_type):  ➋
  try:
    from bruma.master import MasterFactory, Record
  except ImportError:
    print('IMPORT ERROR: Jython 2.5 and Bruma.jar '
        'are required to read .mst files')
    raise SystemExit
  mst = MasterFactory.getInstance(master_file_name).open()
  for record in mst:
    fields = {}
    if SKIP_INACTIVE:
      if record.getStatus() != Record.Status.ACTIVE:
        continue
    else:  # 仅当没有活动的记录时才保存状态
      fields[ISIS_ACTIVE_KEY] = (record.getStatus() ==
                     Record.Status.ACTIVE)
    fields[ISIS_MFN_KEY] = record.getMfn()
    for field in record.getFields():
      field_key = str(field.getId())
      field_occurrences = fields.setdefault(field_key, [])
      if isis_json_type == 3:
        content = {}
        for subfield in field.getSubfields():
          subfield_key = subfield.getId()
          if subfield_key == '*':
            content['_'] = subfield.getContent()
          else:
            subfield_occurrences = content.setdefault(subfield_key, [])
            subfield_occurrences.append(subfield.getContent())
        field_occurrences.append(content)
      elif isis_json_type == 1:
        content = []
        for subfield in field.getSubfields():
          subfield_key = subfield.getId()
          if subfield_key == '*':
            content.insert(0, subfield.getContent())
          else:
            content.append(SUBFIELD_DELIMITER + subfield_key +
                     subfield.getContent())
        field_occurrences.append(''.join(content))
      else:
        raise NotImplementedError('ISIS-JSON type %s conversion '
          'not yet implemented for .mst input' % isis_json_type)
    yield fields
  mst.close()


def write_json(input_gen, file_name, output, qty, skip, id_tag,  ➌
         gen_uuid, mongo, mfn, isis_json_type, prefix,
         constant):
  start = skip
  end = start + qty
  if id_tag:
    id_tag = str(id_tag)
    ids = set()
  else:
    id_tag = ''
  for i, record in enumerate(input_gen):
    if i >= end:
      break
    if not mongo:
      if i == 0:
        output.write('[')
      elif i > start:
        output.write(',')
    if start <= i < end:
      if id_tag:
        occurrences = record.get(id_tag, None)
        if occurrences is None:
          msg = 'id tag #%s not found in record %s'
          if ISIS_MFN_KEY in record:
            msg = msg + (' (mfn=%s)' % record[ISIS_MFN_KEY])
          raise KeyError(msg % (id_tag, i))
        if len(occurrences) > 1:
          msg = 'multiple id tags #%s found in record %s'
          if ISIS_MFN_KEY in record:
             msg = msg + (' (mfn=%s)' % record[ISIS_MFN_KEY])
          raise TypeError(msg % (id_tag, i))
        else:  # 好吧,有且仅有一个id字段
          if isis_json_type == 1:
            id = occurrences[0]
          elif isis_json_type == 2:
            id = occurrences[0][0][1]
          elif isis_json_type == 3:
            id = occurrences[0]['_']
          if id in ids:
            msg = 'duplicate id %s in tag #%s, record   %s'
            if ISIS_MFN_KEY in record:
              msg = msg + (' (mfn=%s)' % record[ISIS_MFN_KEY])
            raise TypeError(msg % (id, id_tag, i))
          record['_id'] = id
          ids.add(id)
      elif gen_uuid:
        record['_id'] = unicode(uuid4())
      elif mfn:
        record['_id'] = record[ISIS_MFN_KEY]
      if prefix:
        # 迭代一个固定的标签序列
        for tag in tuple(record):
          if str(tag).isdigit():
            record[prefix+tag] = record[tag]
            del record[tag]  # 这就是迭代元组的原因
            # 获取标签,但不直接从记录字典中获取
      if constant:
        constant_key, constant_value = constant.split(':')
        record[constant_key] = constant_value
      output.write(json.dumps(record).encode('utf-8'))
      output.write('\n')
  if not mongo:
    output.write(']\n')


def main(): ➍
  # 创建解析器
  parser = argparse.ArgumentParser(
    description='Convert an ISIS .mst or .iso file to a JSON array')

  # 添加参数
  parser.add_argument(
    'file_name', metavar='INPUT.(mst|iso)',
    help='.mst or .iso file to read')
  parser.add_argument(
    '-o', '--out', type=argparse.FileType('w'), default=sys.stdout,
    metavar='OUTPUT.json',
    help='the file where the JSON output should be written'
       ' (default: write to stdout)')
  parser.add_argument(
    '-c', '--couch', action='store_true',
    help='output array within a "docs" item in a JSON document'
       ' for bulk insert to CouchDB via POST to db/_bulk_docs')
  parser.add_argument(
    '-m', '--mongo', action='store_true',
    help='output individual records as separate JSON dictionaries, one'
       ' per line for bulk insert to MongoDB via mongoimport utility')
  parser.add_argument(
    '-t', '--type', type=int, metavar='ISIS_JSON_TYPE', default=1,
    help='ISIS-JSON type, sets field structure: 1=string, 2=alist,'
       ' 3=dict (default=1)')
  parser.add_argument(
    '-q', '--qty', type=int, default=DEFAULT_QTY,
    help='maximum quantity of records to read (default=ALL)')
  parser.add_argument(
    '-s', '--skip', type=int, default=0,
    help='records to skip from start of .mst (default=0)')
  parser.add_argument(
    '-i', '--id', type=int, metavar='TAG_NUMBER', default=0,
    help='generate an "_id" from the given unique TAG field number'
       ' for each record')
  parser.add_argument(
    '-u', '--uuid', action='store_true',
    help='generate an "_id" with a random UUID for each record')
  parser.add_argument(
    '-p', '--prefix', type=str, metavar='PREFIX', default='',
    help='concatenate prefix to every numeric field tag'
       ' (ex. 99 becomes "v99")')
  parser.add_argument(
    '-n', '--mfn', action='store_true',
    help='generate an "_id" from the MFN of each record'
       ' (available only for .mst input)')
  parser.add_argument(
    '-k', '--constant', type=str, metavar='TAG:VALUE', default='',
    help='Include a constant tag:value in every record (ex. -k type:AS)')

  '''
  # TODO: 实现这个功能,导出大量记录供给CouchDB
  parser.add_argument(
    '-r', '--repeat', type=int, default=1,
    help='repeat operation, saving multiple JSON files'
       ' (default=1, use -r 0 to repeat until end of input)')
  '''
  # 解析命令行
  args = parser.parse_args()
  if args.file_name.lower().endswith('.mst'):
    input_gen_func = iter_mst_records  ➎
  else:
    if args.mfn:
      print('UNSUPORTED: -n/--mfn option only available for .mst input.')
      raise SystemExit
    input_gen_func = iter_iso_records  ➏
  input_gen = input_gen_func(args.file_name, args.type)  ➐
  if args.couch:
    args.out.write('{ "docs" : ')
  write_json(input_gen, args.file_name, args.out, args.qty,  ➑
         args.skip, args.id, args.uuid, args.mongo, args.mfn,
         args.type, args.prefix, args.constant)
  if args.couch:
    args.out.write('}\n')
  args.out.close()


if __name__ == '__main__':
  main()

❶ iter_iso_records 生成器函数读取 .iso 文件,产出记录。

❷ iter_mst_records 生成器函数读取 .mst 文件,产出记录。

❸ write_json 函数迭代 input_gen 生成器,输出 .json 文件。

❹ main 函数读取命令行参数,然后根据输入文件的扩展名选择……

❺ ……iter_mst_records 生成器函数……

❻ …… 或者 iter_iso_records 生成器函数。

❼ 使用选中的生成器函数构建生成器对象。

❽ 把生成器作为第一个参数传给 write_json 函数。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文