PART Ⅰ : 容器云OPENSHIFT
- 安装
- 数据持久化
- 集群管理
- 数据持久化
- 管理
- 网络
- 安全审计
- 工具应用部署
PART Ⅱ:容器云 KUBERNETES
- 基础
- 原理
- 系统应用/网络CNI/TRaefik
- 安装
- 集群管理
- 用户认证ServiceAccount与授权策略RBAC
- K8S应用管理工具Helm
- 问题
- 辅助工具
- Doing:K8S 多集群管理与网络互联
- VM On K8S
PART Ⅲ:持续集成与持续部署
- CICD优化总结
- Jenkins
- Gitlab
- Drone
- Nexus
- 配置
- 使用OrientDB Console在DB层面修改配置
- [设置SMTP邮件服务](https://www.wenjiangs.com/doc/krrcu7ebin9hh
- 仓库管理
- 数据备份恢复
- API
- Jenkins相关插件
- 配置
- SonarQube静态代码扫描分析
- LDAP
- Apollo
- 项目管理工具
- Jira
- Redmine
- Harbor
- Vault
- Alfred
- Web IDE: VSCode
- DolphinScheduler
PART Ⅴ:日志/监控/告警
- Logging
- Kafka/Zookeeper
- Filebeat
- Metrics
- Tracing
- Sentry日志聚合告警平台
PART Ⅵ:基础
- Docker
- Shell脚本
- Mave
- git
- 正则表达式
- SSL/TLS
- Ceph
- 性能压力测试
- PXE+Kickstart
- netboot.xyz
- Tool
- Windows
- MacOS小技巧
- Linux
- Linux排错优化
- iptables详解
- MySQL
- Redis
- 负载均衡与代理
- 代理服务器
- Nginx
- GitBook
- Telegram机器人
- OpenVPN Server
- iDRAC
- vSphere
- Raspberry Pi树莓派
- 钉钉机器人
- Aliyun CLI
- 音、视频处理工具:fffmpeg
- 图片处理工具:Imagemagick
- PDF处理工具:Ghostscript
- Nvidia
- Virtualbox 虚拟机管理
- 阿里云产品使用总结
- RustDesk:可自建远程控制软件
- Poste:自建邮件服务器
- 使用 Jlink构建最小化依赖的 JRE 环境
- Aria2
- Asuswrt-Merlin
- Trap:Shell脚本信号跟踪
- 零散知识汇总
- BarkServer通知
- Synology
PART Ⅶ:数据存储、处理
PART VIII:CODE
- Python学习笔记
- 基础语法
- statik 将静态资源文件打包到二进制文件中
- HTML/CSS 学习笔记
- JavaScript学习笔记
PART X:HACKINTOSH
PART XI:安全
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
kafka连接调试脚本
为了测试kafka客户端连接k8s上Kafka Bootstrap返回的信息,有一个Python脚本可显示Broker地址,并产生测试数据验证生产消费是否正常
GItHub:https://github.com/rmoff/kafka-listeners/blob/master/python/python_kafka_test_client.py
Python代码
from confluent_kafka.admin import AdminClient
from confluent_kafka import Consumer
from confluent_kafka import Producer
from sys import argv
from datetime import datetime
topic='test_topic'
def Produce(source_data):
print('\n<Producing>')
p = Producer({'bootstrap.servers': bootstrap_server})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('❌ Message delivery failed: {}'.format(err))
else:
print('✅ 📬 Message delivered: "{}" to {} [partition {}]'.format(msg.value().decode('utf-8'),msg.topic(), msg.partition()))
for data in source_data:
p.poll(0)
p.produce(topic, data.encode('utf-8'), callback=delivery_report)
r=p.flush(timeout=5)
if r>0:
print('❌ Message delivery failed ({} message(s) still remain, did we timeout sending perhaps?)\n'.format(r))
def Consume():
print('\n<Consuming>')
c = Consumer({
'bootstrap.servers': bootstrap_server,
'group.id': 'rmoff',
'auto.offset.reset': 'earliest'
})
c.subscribe([topic])
try:
msgs = c.consume(num_messages=1,timeout=30)
if len(msgs)==0:
print("❌ No message(s) consumed (maybe we timed out waiting?)\n")
else:
for msg in msgs:
print('✅ 💌 Message received: "{}" from topic {}\n'.format(msg.value().decode('utf-8'),msg.topic()))
except Exception as e:
print("❌ Consumer error: {}\n".format(e))
c.close()
try:
bs=argv[1]
print('\n🥾 bootstrap server: {}'.format(bs))
bootstrap_server=bs
except:
# no bs X-D
bootstrap_server='localhost:9092'
print('⚠️ No bootstrap server defined, defaulting to {}\n'.format(bootstrap_server))
a = AdminClient({'bootstrap.servers': bootstrap_server})
try:
md=a.list_topics(timeout=10)
print("""
✅ Connected to bootstrap server(%s) and it returned metadata for brokers as follows:
%s
---------------------
ℹ️ This step just confirms that the bootstrap connection was successful.
ℹ️ For the consumer to work your client will also need to be able to resolve the broker(s) returned
in the metadata above.
ℹ️ If the host(s) shown are not accessible from where your client is running you need to change
your advertised.listener configuration on the Kafka broker(s).
"""
% (bootstrap_server,md.brokers))
try:
Produce(['foo / ' + datetime.now().strftime('%Y-%m-%d %H:%M:%S')])
Consume()
except:
print("❌ (uncaught exception in produce/consume)")
except Exception as e:
print("""
❌ Failed to connect to bootstrap server.
👉 %s
ℹ️ Check that Kafka is running, and that the bootstrap server you've provided (%s) is reachable from your client
"""
% (e,bootstrap_server))
安装脚本依赖
python3 -m pip install confluent_kafka
测试命令
python3 python_kafka_test_client.py localhost:9092
测试输出,显示了kafka bootstrap返回给客户端的broker连接地址
🥾 bootstrap server: localhost:9092
✅ Connected to bootstrap server(localhost:9092) and it returned metadata for brokers as follows:
{0: BrokerMetadata(0, curiouser:9092)}
---------------------
ℹ️ This step just confirms that the bootstrap connection was successful.
ℹ️ For the consumer to work your client will also need to be able to resolve the broker(s) returned
in the metadata above.
ℹ️ If the host(s) shown are not accessible from where your client is running you need to change
your advertised.listener configuration on the Kafka broker(s).
<Producing>
✅ 📬 Message delivered: "foo / 2020-12-23 18:19:24" to test_topic [partition 0]
<Consuming>
✅ 💌 Message received: "foo / 2020-12-23 18:19:24" from topic test_topic
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论