kerberos错误在汇合卡夫卡上进行身份验证

发布于 2025-01-22 19:02:04 字数 3045 浏览 6 评论 0原文

我一直在尝试了解Apache Beam,Confluent Kafka和与Python 3.8和Beam SDK 2.7的DataFlow集成的需求结果是构建管道(将在数据流上运行),该管道从Confluent Kafka中消耗,并且只登录GCP上的消息。(我正在使用JDK 17 BTW)

这是我使用的代码:

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging

os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'

with open('cluster.configuration.json') as cluster:
    data=json.load(cluster)
    cluster.close()

def logger(element):
    logging.INFO('Something was found')  

config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":"SASL_SSL",
        "sasl.mechanisms":"PLAIN",
        "session.timeout.ms":data["session.timeout.ms"],
        "group.id":"tto",
        "sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
        "auto.offset.reset":"earliest"
    }

 
def main():
    
    print('======================================================')
    beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
    with beam.Pipeline(options=beam_options) as p:
        msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'])
        msgs | beam.FlatMap(logger)
        
if __name__ == '__main__':    
    main()

我已经使用DataFlow测试了此管道,还使用Direct Runner和两个跑步者测试了此错误,我会收到一个错误:“超时提取主题元数据时”。

由于我收到这些警告,因此由于消费者无法对Confluent Kafka进行身份验证,因此此错误似乎是造成的:

WARNING:root:severity: WARN
timestamp {
  seconds: 1650473787
  nanos: 331000000
}
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Bootstrap broker "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null) disconnected"
instruction_id: "bundle_1"
transform_id: "ReadFromKafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/ParDo(GenerateKafkaSourceDescriptor)/ParMultiDo(GenerateKafkaSourceDescriptor)"
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "21"

在此警告之后,我收到了其他警告:

message: "[Consumer clientId=consumer-tto-1, groupId=tto] Error connecting to node "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null)"
trace: "java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]\n\tat

Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator

Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login

两个重要的事情是我已经在Python上建立了一个消费者,但是没有Readfromkafka Apache Beam IO及其与主题完美连接并完美地消费弹出,因为我不使用Kerberos身份验证)...另一件事是通过扩展服务使用了转换“ Readfromkafka”,因为此转换仅由Java支持,但是使用Apache Beam,我可以在Python上使用它。

I´ve been trying to understand apache beam, confluent kafka and dataflow integration with python 3.8 and beam sdk 2.7 the desire result is to build a pipeline (which is going to be ran on dataflow) which consumes from confluent kafka and and just logs the messages on gcp.(I'm using JDK 17 btw)

This is the code I´m using:

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging

os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'

with open('cluster.configuration.json') as cluster:
    data=json.load(cluster)
    cluster.close()

def logger(element):
    logging.INFO('Something was found')  

config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":"SASL_SSL",
        "sasl.mechanisms":"PLAIN",
        "session.timeout.ms":data["session.timeout.ms"],
        "group.id":"tto",
        "sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
        "auto.offset.reset":"earliest"
    }

 
def main():
    
    print('======================================================')
    beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
    with beam.Pipeline(options=beam_options) as p:
        msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'])
        msgs | beam.FlatMap(logger)
        
if __name__ == '__main__':    
    main()

I have tested this pipeline with dataflow but also with direct runner and on both runners I get a this error: "Timeout while fetching topic metadata".

This error seems to be caused because of the consumer being unable to authenticate to confluent kafka since I get these warnings:

WARNING:root:severity: WARN
timestamp {
  seconds: 1650473787
  nanos: 331000000
}
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Bootstrap broker "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null) disconnected"
instruction_id: "bundle_1"
transform_id: "ReadFromKafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/ParDo(GenerateKafkaSourceDescriptor)/ParMultiDo(GenerateKafkaSourceDescriptor)"
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "21"

And after this warning I get this other warning:

message: "[Consumer clientId=consumer-tto-1, groupId=tto] Error connecting to node "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null)"
trace: "java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]\n\tat

Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator

Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login

Two important things are that I already build a consumer on python but without the ReadFromKafka apache beam io and it connects and consumes perfectly to the topic so the credentials I'm using are the same and I have the same protocol "SASL_SSL""PLAIN" (related to this also I don't have any idea why is a kerberos error popping since I'm not using kerberos authentication)... the other thing is that the transform 'ReadFromKafka' is used through a expansion service since this transform is only supported by java but with apache beam I can use it on Python.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

不甘平庸 2025-01-29 19:02:04

好的,这个错误真的很容易解决,我在“ sasl.meneriancs”中有一个错别字,因此该属性没有得到认可。

而不是SASL。机械使用SASL.Menerianm。

Ok the mistake was really simple to fix, I had a typo in 'sasl.mechanisms' so the property wasn't getting recognized.

Instead of sasl.mechanisms use sasl.mechanism.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文