无法打印来自kafka消费者的数据(基于日志的变更数据捕获项目)

发布于 2025-01-11 18:24:43 字数 2511 浏览 0 评论 0原文

我有一个项目,使用kafka、zookeeper、debezium从mysql数据库捕获数据更改。实际上,我对kafka很陌生,我只是遵循debezium教程,所以在尝试使用kafka-python客户端打印消息值时遇到一些麻烦。这是我的 docker-compose 文件和配置文件:

我的 docker-compose 文件

version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:1.8
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:1.8
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
  mysql:
    image: quay.io/debezium/example-mysql:1.8
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  connect:
    image: quay.io/debezium/connect:1.8
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

配置文件

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.include.list": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "key.converter":"org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false
    }
}

Python 代码

from kafka-python import KafkaConsumer
import json
import os
import logging
import sys
from google.cloud import bigquery

consumer = KafkaConsumer(
    "dbserver1.inventory.customers",
    auto_offset_reset="from-beginning",
    group_id='console-consumer-47348',
    bootstrap_servers=["kafka:9092"],
    api_version = (2,0,2),
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    max_poll_interval_ms=5000,
    max_poll_records=1,
)
for message in consumer:
    msg = message.value
    print('{}'.format(msg))

非常感谢!

I have a project that using kafka, zookeeper, debezium to capture data changes from mysql database. Actually, I quite new to kafka and I just follow debezium tutorial,so I face some troubles when trying to use kafka-python client to print message value. Here is my docker-compose file and config file:

My docker-compose file

version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:1.8
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:1.8
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
  mysql:
    image: quay.io/debezium/example-mysql:1.8
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  connect:
    image: quay.io/debezium/connect:1.8
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

Config file

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.include.list": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "key.converter":"org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false
    }
}

Python code

from kafka-python import KafkaConsumer
import json
import os
import logging
import sys
from google.cloud import bigquery

consumer = KafkaConsumer(
    "dbserver1.inventory.customers",
    auto_offset_reset="from-beginning",
    group_id='console-consumer-47348',
    bootstrap_servers=["kafka:9092"],
    api_version = (2,0,2),
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    max_poll_interval_ms=5000,
    max_poll_records=1,
)
for message in consumer:
    msg = message.value
    print('{}'.format(msg))

Thank you very much!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文