从本地机器访问在 minikube 中运行的 bitnami/kafka

发布于 2025-01-09 17:16:31 字数 1607 浏览 1 评论 0原文

我正在尝试访问使用 helm 部署的 bitnami/kafka 集群。我需要能够从本地计算机向 kafka 发送消息,然后让我的 Pod 处理数据。

我使用以下命令来启动集群: helm install my-kafka bitnami/kafka

>>> kubectl get all 
NAME                       READY   STATUS    RESTARTS   AGE
pod/my-kafka-0             1/1     Running   0          32s
pod/my-kafka-zookeeper-0   1/1     Running   0          32s

NAME                                  TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
service/kubernetes                    ClusterIP   10.96.0.1        <none>        443/TCP                      5d18h
service/my-kafka                      ClusterIP   10.110.208.220   <none>        9092/TCP                     32s
service/my-kafka-headless             ClusterIP   None             <none>        9092/TCP,9093/TCP            32s
service/my-kafka-zookeeper            ClusterIP   10.105.53.40     <none>        2181/TCP,2888/TCP,3888/TCP   32s
service/my-kafka-zookeeper-headless   ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   32s

NAME                                  READY   AGE
statefulset.apps/my-kafka             1/1     32s
statefulset.apps/my-kafka-zookeeper   1/1     32s

然后我使用 minikube 使用 minikube service my-kafka 公开该服务

|-----------|----------|-------------|--------------|
| NAMESPACE |   NAME   | TARGET PORT |     URL      |
|-----------|----------|-------------|--------------|
| default   | my-kafka |             | No node port |
|-----------|----------|-------------|--------------|

              

I am trying to access a bitnami/kafka cluster deployed using helm. I need to be able to send messages to kafka from my local machine and then have my pods process the data.

I am using the following command to bring up the cluster:
helm install my-kafka bitnami/kafka

>>> kubectl get all 
NAME                       READY   STATUS    RESTARTS   AGE
pod/my-kafka-0             1/1     Running   0          32s
pod/my-kafka-zookeeper-0   1/1     Running   0          32s

NAME                                  TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
service/kubernetes                    ClusterIP   10.96.0.1        <none>        443/TCP                      5d18h
service/my-kafka                      ClusterIP   10.110.208.220   <none>        9092/TCP                     32s
service/my-kafka-headless             ClusterIP   None             <none>        9092/TCP,9093/TCP            32s
service/my-kafka-zookeeper            ClusterIP   10.105.53.40     <none>        2181/TCP,2888/TCP,3888/TCP   32s
service/my-kafka-zookeeper-headless   ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   32s

NAME                                  READY   AGE
statefulset.apps/my-kafka             1/1     32s
statefulset.apps/my-kafka-zookeeper   1/1     32s

I then expose the service using minikube using minikube service my-kafka

|-----------|----------|-------------|--------------|
| NAMESPACE |   NAME   | TARGET PORT |     URL      |
|-----------|----------|-------------|--------------|
| default   | my-kafka |             | No node port |
|-----------|----------|-------------|--------------|
????  service default/my-kafka has no node port
????  Starting tunnel for service my-kafka.
|-----------|----------|-------------|------------------------|
| NAMESPACE |   NAME   | TARGET PORT |          URL           |
|-----------|----------|-------------|------------------------|
| default   | my-kafka |             | http://127.0.0.1:62194 |
|-----------|----------|-------------|------------------------|
????  Opening service default/my-kafka in default browser...
❗  Because you are using a Docker driver on darwin, the terminal needs to be open to run it.

Finally, I try to produce data using python:

from time import sleep

from kafka import KafkaClient, KafkaProducer

port = 62194

# create a client and print some information out
client = KafkaClient(bootstrap_servers=f"localhost:{port}")
print(f"Bootstrap Connected: {client.bootstrap_connected()}")
print(f"API Versions: {client.get_api_versions()}")

# try creating a producer
print("Creating Producer")
producer = KafkaProducer(bootstrap_servers=f"localhost:{port}")
print("Producer Created")

# send messages
for i in range(0,10):
    msg = f"message-{i}"
    print(f"Sending {msg}")
    producer.send("my-topic", msg.encode("utf-8"))
    sleep(1)

The output of the above script is:

Bootstrap Connected: True
API Versions: {0: (0, 9), 1: (0, 13), 2: (0, 7), 3: (0, 12), 4: (0, 5), 5: (0, 3), 6: (0, 7), 7: (0, 3), 8: (0, 8), 9: (0, 8), 10: (0, 4), 11: (0, 7), 12: (0, 4), 13: (0, 4), 14: (0, 5), 15: (0, 5), 16: (0, 4), 17: (0, 1), 18: (0, 3), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 4), 23: (0, 4), 24: (0, 3), 25: (0, 3), 26: (0, 3), 27: (0, 1), 28: (0, 3), 29: (0, 2), 30: (0, 2), 31: (0, 2), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 2), 36: (0, 2), 37: (0, 3), 38: (0, 2), 39: (0, 2), 40: (0, 2), 41: (0, 2), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 0), 57: (0, 0), 60: (0, 0), 61: (0, 0), 65: (0, 0), 66: (0, 0), 67: (0, 0)}
Creating Producer
Producer Created
Sending message-0
Traceback (most recent call last):
  File "/Users/flevine/code/ramen/kafka/producer.py", line 21, in <module>
    producer.send("my-topic", msg.encode("utf-8"))
  File "/Users/flevine/code/ramen/kafka/venv/lib/python3.9/site-packages/kafka/producer/kafka.py", line 576, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/Users/flevine/code/ramen/kafka/venv/lib/python3.9/site-packages/kafka/producer/kafka.py", line 702, in _wait_on_metadata
    raise Errors.KafkaTimeoutError(
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

From what I can tell, it's connecting to kafka, but something is not configured right. I have tested my producer/consumer scripts using a docker deployment, but I need to be in kubernetes for the project I'm working.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

不离久伴 2025-01-16 17:16:31

我认为问题源于 Minikube 独立网络。 Helm Chart 自动将广告监听器的 Minikube 网络列入白名单。我通过重新配置 helm value.yaml 解决了类似的问题,如下所示:

externalAccess:
  enabled: true
  autoDiscovery:
    enabled: false
    image:
      registry: docker.io
      repository: bitnami/kubectl
      tag: 1.23.4-debian-10-r17
      pullPolicy: IfNotPresent
      pullSecrets: []
    resources:
      limits: {}
      requests: {}
  service:
    type: NodePort
    port: 9094
    loadBalancerIPs: []
    loadBalancerSourceRanges: []
    nodePorts:
      - 30000
      - 30001
      - 30002
    useHostIPs: false
    annotations: {}
    domain: 127.0.0.1

I think the problem stems from the Minikube separate network. The helm chart automatically whitelists the Minikube network for advertised.listeners. I solved a similar issue by reconfiguring the helm values.yaml like this:

externalAccess:
  enabled: true
  autoDiscovery:
    enabled: false
    image:
      registry: docker.io
      repository: bitnami/kubectl
      tag: 1.23.4-debian-10-r17
      pullPolicy: IfNotPresent
      pullSecrets: []
    resources:
      limits: {}
      requests: {}
  service:
    type: NodePort
    port: 9094
    loadBalancerIPs: []
    loadBalancerSourceRanges: []
    nodePorts:
      - 30000
      - 30001
      - 30002
    useHostIPs: false
    annotations: {}
    domain: 127.0.0.1
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文