无法打印来自kafka消费者的数据(基于日志的变更数据捕获项目)
我有一个项目,使用kafka、zookeeper、debezium从mysql数据库捕获数据更改。实际上,我对kafka很陌生,我只是遵循debezium教程,所以在尝试使用kafka-python客户端打印消息值时遇到一些麻烦。这是我的 docker-compose 文件和配置文件:
我的 docker-compose 文件
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:1.8
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:1.8
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
mysql:
image: quay.io/debezium/example-mysql:1.8
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
connect:
image: quay.io/debezium/connect:1.8
ports:
- 8083:8083
links:
- kafka
- mysql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
配置文件
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false
}
}
Python 代码
from kafka-python import KafkaConsumer
import json
import os
import logging
import sys
from google.cloud import bigquery
consumer = KafkaConsumer(
"dbserver1.inventory.customers",
auto_offset_reset="from-beginning",
group_id='console-consumer-47348',
bootstrap_servers=["kafka:9092"],
api_version = (2,0,2),
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
max_poll_interval_ms=5000,
max_poll_records=1,
)
for message in consumer:
msg = message.value
print('{}'.format(msg))
非常感谢!
I have a project that using kafka, zookeeper, debezium to capture data changes from mysql database. Actually, I quite new to kafka and I just follow debezium tutorial,so I face some troubles when trying to use kafka-python client to print message value. Here is my docker-compose file and config file:
My docker-compose file
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:1.8
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:1.8
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
mysql:
image: quay.io/debezium/example-mysql:1.8
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
connect:
image: quay.io/debezium/connect:1.8
ports:
- 8083:8083
links:
- kafka
- mysql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
Config file
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false
}
}
Python code
from kafka-python import KafkaConsumer
import json
import os
import logging
import sys
from google.cloud import bigquery
consumer = KafkaConsumer(
"dbserver1.inventory.customers",
auto_offset_reset="from-beginning",
group_id='console-consumer-47348',
bootstrap_servers=["kafka:9092"],
api_version = (2,0,2),
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
max_poll_interval_ms=5000,
max_poll_records=1,
)
for message in consumer:
msg = message.value
print('{}'.format(msg))
Thank you very much!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论