canal-python 阿里巴巴开源 mysql 数据库 binlog 的增量订阅 & 消费组件 python 客户端

发布于 2021-07-16 23:33:39 字数 3581 浏览 1800 评论 0

Canal 是阿里巴巴 mysql 数据库 binlog 的增量订阅 & 消费组件 python 客户端。为 python 开发者提供一个更友好的使用 Canal 的方式。Canal 是 mysql 数据库 binlog 的增量订阅&消费组件。

基于日志增量订阅 & 消费支持的业务:

  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 业务cache刷新
  6. 价格变化等重要业务消息

关于 Canal 的更多信息请访问 https://github.com/alibaba/canal/wiki

二.应用场景

canal-python 作为 Canal 的客户端,其应用场景就是 Canal 的应用场景。关于应用场景在 Canal 介绍一节已有概述。举一些实际的使用例子:

  1. 代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。
  2. 根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等
  3. 根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis
  4. 数据库异地备份、数据同步
  5. 根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。
  6. 将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。

三.工作原理

canal-python 是 Canal 的 python 客户端,它与 Canal 是采用的 Socket 来进行通信的,传输协议是 TCP,交互协议采用的是 Google Protocol Buffer 3.0。

四.工作流程

  1. Canal 连接到 mysql 数据库,模拟 slave
  2. canal-python 与 Canal 建立连接
  3. 数据库发生变更写入到 binlog
  4. Canal 向数据库发送 dump 请求,获取 binlog 并解析
  5. canal-python 向 Canal 请求数据库变更
  6. Canal 发送解析后的数据给 canal-python
  7. canal-python 收到数据,消费成功,发送回执。(可选)
  8. Canal 记录消费位置。

五.快速启动

安装 Canal

Canal 的安装以及配置使用请查看 https://github.com/alibaba/canal/wiki/QuickStart

环境要求

python >= 3

构建 canal python 客户端

pip install canal-python

建立与 Canal 的连接

import time

from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2

client = Client()
client.connect(host='127.0.0.1', port=11111)
client.check_valid(username=b'', password=b'')
client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*')

while True:
  message = client.get(100)
  entries = message['entries']
  for entry in entries:
    entry_type = entry.entryType
    if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
      continue
    row_change = EntryProtocol_pb2.RowChange()
    row_change.MergeFromString(entry.storeValue)
    event_type = row_change.eventType
    header = entry.header
    database = header.schemaName
    table = header.tableName
    event_type = header.eventType
    for row in row_change.rowDatas:
      format_data = dict()
      if event_type == EntryProtocol_pb2.EventType.DELETE:
        for column in row.beforeColumns:
          format_data = {
            column.name: column.value
          }
      elif event_type == EntryProtocol_pb2.EventType.INSERT:
        for column in row.afterColumns:
          format_data = {
            column.name: column.value
          }
      else:
        format_data['before'] = format_data['after'] = dict()
        for column in row.beforeColumns:
          format_data['before'][column.name] = column.value
        for column in row.afterColumns:
          format_data['after'][column.name] = column.value
      data = dict(
        db=database,
        table=table,
        event_type=event_type,
        data=format_data,
      )
      print(data)
  time.sleep(1)

client.disconnect()

更多详情请查看 Sample

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

JSmiles

生命进入颠沛而奔忙的本质状态,并将以不断告别和相遇的陈旧方式继续下去。

0 文章
0 评论
84960 人气
更多

推荐作者

lorenzathorton8

文章 0 评论 0

Zero

文章 0 评论 0

萧瑟寒风

文章 0 评论 0

mylayout

文章 0 评论 0

tkewei

文章 0 评论 0

17818769742

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文