如何将Kafka IO从Apache Beam连接到Confluent Cloud中的群集

发布于 2025-01-20 01:42:34 字数 2357 浏览 7 评论 0原文

我已经在Python读了一个简单的管道,可以从Kafka阅读,这是Kafka群集在Confluent Cloud上,并且在连接它时遇到了一些麻烦。

我会在数据流工作中获取以下登录:

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
    at org.apache.beam.sdk.io.kafka.KafkaIO$Read$GenerateKafkaSourceDescriptor.processElement(KafkaIO.java:1495)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

所以我认为我在传递配置时缺少某些内容,因为它提到了与之相关的内容,我对所有这些都不了解,我对Java一无所知,所以我不知道如何继续阅读阅读JAAS文档。

管道的代码如下:

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging

os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'

with open('cluster.configuration.json') as cluster:
    data=json.load(cluster)
    cluster.close()

def logger(element):
    logging.INFO('Something was found')  
      
def main():
    config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":data["security.protocol"],
        "sasl.mechanisms":data["sasl.mechanisms"],
        "sasl.username":data["sasl.username"],
        "sasl.password":data["sasl.password"],
        "session.timeout.ms":data["session.timeout.ms"],
        "auto.offset.reset":"earliest"
    }
    print('======================================================')
    beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
    with beam.Pipeline(options=beam_options) as p:
        msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'],expansion_service="localhost:8088")
        msgs | beam.FlatMap(logger)
        
if __name__ == '__main__':    
    main()

我阅读了有关传递属性java.security.auth.login.config的一些内容通过,甚至是我必须通过的属性等。btw

im从这里获取API密钥和秘密,这就是我传递给sasl.username和sasl.passl.password

I´ve made a simple pipeline in Python to read from kafka, the thing is that the kafka cluster is on confluent cloud and I am having some trouble conecting to it.

Im getting the following log on the dataflow job:

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
    at org.apache.beam.sdk.io.kafka.KafkaIO$Read$GenerateKafkaSourceDescriptor.processElement(KafkaIO.java:1495)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

So I think Im missing something while passing the config since it mentions something related to it, Im really new to all of this and I know nothing about java so I dont know how to proceed even reading the JAAS documentation.

The code of the pipeline is the following:

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging

os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'

with open('cluster.configuration.json') as cluster:
    data=json.load(cluster)
    cluster.close()

def logger(element):
    logging.INFO('Something was found')  
      
def main():
    config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":data["security.protocol"],
        "sasl.mechanisms":data["sasl.mechanisms"],
        "sasl.username":data["sasl.username"],
        "sasl.password":data["sasl.password"],
        "session.timeout.ms":data["session.timeout.ms"],
        "auto.offset.reset":"earliest"
    }
    print('======================================================')
    beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
    with beam.Pipeline(options=beam_options) as p:
        msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'],expansion_service="localhost:8088")
        msgs | beam.FlatMap(logger)
        
if __name__ == '__main__':    
    main()

I read something about passing a property java.security.auth.login.config in the config dictionary but since that example is with java and I´am using python Im really lost at what I have to pass or even if that´s the property I have to pass etc.

btw Im getting the api key and secret from here and this is what I am passing to sasl.username and sasl.password

enter image description here

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

半窗疏影 2025-01-27 01:42:34

第一次尝试Beam的扩展服务时,我面临着同样的错误。您提供的键sasl.Menerianls是不正确的,请尝试sasl.menerianm您也不需要提供用户名和密码,因为您的连接是由JASL认证的基本上,如以下对我有用的consumer_config:

config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":data["security.protocol"],
        "sasl.mechanism":data["sasl.mechanisms"],
        "session.timeout.ms":data["session.timeout.ms"],
        "group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
        "auto.offset.reset":"earliest"
    }

I faced the same error the first time I tried the beam's expansion service. The key sasl.mechanisms that you are supplying is incorrect, try with sasl.mechanism also you do not need to supply the username and password since you are connection is authenticated by jasl basically the consumer_config like below worked for me:

config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":data["security.protocol"],
        "sasl.mechanism":data["sasl.mechanisms"],
        "session.timeout.ms":data["session.timeout.ms"],
        "group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
        "auto.offset.reset":"earliest"
    }
看春风乍起 2025-01-27 01:42:34

自从我解决了这个问题后,我得到了这个问题的部分答案,但又遇到了另一个问题:

config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":data["security.protocol"],
        "sasl.mechanisms":data["sasl.mechanisms"],
        "sasl.username":data["sasl.username"],
        "sasl.password":data["sasl.password"],
        "session.timeout.ms":data["session.timeout.ms"],
        "group.id":"tto",
        "sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
        "auto.offset.reset":"earliest"
    }

我需要向 sasl.jaas.config porpertie 提供我的集群的 api 密钥和秘密以及服务名称,但是,现在我面临在数据流上运行管道时出现不同的错误:

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 

尝试在数据流上运行作业 4-5 分钟后显示此错误,实际上我不知道如何解决此问题,但我认为与汇合拒绝连接时的代理有关,我认为这可能与区域执行有关,因为集群位于与作业区域不同的区域中。

更新:

我在linux/ubuntu上测试了代码,我不知道为什么,但是扩展服务会自动下载,所以你不会得到不支持的信号错误,在尝试验证汇合的kafka时仍然遇到一些问题。

I got a partial answer to this question since I fixed this problem but got into another one:

config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":data["security.protocol"],
        "sasl.mechanisms":data["sasl.mechanisms"],
        "sasl.username":data["sasl.username"],
        "sasl.password":data["sasl.password"],
        "session.timeout.ms":data["session.timeout.ms"],
        "group.id":"tto",
        "sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
        "auto.offset.reset":"earliest"
    }

I needed to provide the sasl.jaas.config porpertie with the api key and secret of my cluster and also the service name, however, now Im facing a different error whe running the pipeline on dataflow:

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 

This error shows after 4-5 mins of trying to run the job on dataflow, actually I have no idea how to fix this but I think is related to my broker on confluent rejecting the connection, I think this could be related to the zone execution since the cluster is in a different zone than job region.

UPDATE:

I tested the code on linux/ubuntu and I dont know why but the expansión service gets downloaded automatically so you wont get unsoported signal error, still having some issues trying to autenticate to confluent kafka tho.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文