kerberos错误在汇合卡夫卡上进行身份验证
我一直在尝试了解Apache Beam,Confluent Kafka和与Python 3.8和Beam SDK 2.7的DataFlow集成的需求结果是构建管道(将在数据流上运行),该管道从Confluent Kafka中消耗,并且只登录GCP上的消息。(我正在使用JDK 17 BTW)
这是我使用的代码:
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'
with open('cluster.configuration.json') as cluster:
data=json.load(cluster)
cluster.close()
def logger(element):
logging.INFO('Something was found')
config={
"bootstrap.servers":data["bootstrap.servers"],
"security.protocol":"SASL_SSL",
"sasl.mechanisms":"PLAIN",
"session.timeout.ms":data["session.timeout.ms"],
"group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
"auto.offset.reset":"earliest"
}
def main():
print('======================================================')
beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
with beam.Pipeline(options=beam_options) as p:
msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'])
msgs | beam.FlatMap(logger)
if __name__ == '__main__':
main()
我已经使用DataFlow测试了此管道,还使用Direct Runner和两个跑步者测试了此错误,我会收到一个错误:“超时提取主题元数据时”。
由于我收到这些警告,因此由于消费者无法对Confluent Kafka进行身份验证,因此此错误似乎是造成的:
WARNING:root:severity: WARN
timestamp {
seconds: 1650473787
nanos: 331000000
}
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Bootstrap broker "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null) disconnected"
instruction_id: "bundle_1"
transform_id: "ReadFromKafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/ParDo(GenerateKafkaSourceDescriptor)/ParMultiDo(GenerateKafkaSourceDescriptor)"
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "21"
在此警告之后,我收到了其他警告:
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Error connecting to node "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null)"
trace: "java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]\n\tat
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login
两个重要的事情是我已经在Python上建立了一个消费者,但是没有Readfromkafka Apache Beam IO及其与主题完美连接并完美地消费弹出,因为我不使用Kerberos身份验证)...另一件事是通过扩展服务使用了转换“ Readfromkafka”,因为此转换仅由Java支持,但是使用Apache Beam,我可以在Python上使用它。
I´ve been trying to understand apache beam, confluent kafka and dataflow integration with python 3.8 and beam sdk 2.7 the desire result is to build a pipeline (which is going to be ran on dataflow) which consumes from confluent kafka and and just logs the messages on gcp.(I'm using JDK 17 btw)
This is the code I´m using:
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'
with open('cluster.configuration.json') as cluster:
data=json.load(cluster)
cluster.close()
def logger(element):
logging.INFO('Something was found')
config={
"bootstrap.servers":data["bootstrap.servers"],
"security.protocol":"SASL_SSL",
"sasl.mechanisms":"PLAIN",
"session.timeout.ms":data["session.timeout.ms"],
"group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
"auto.offset.reset":"earliest"
}
def main():
print('======================================================')
beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
with beam.Pipeline(options=beam_options) as p:
msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'])
msgs | beam.FlatMap(logger)
if __name__ == '__main__':
main()
I have tested this pipeline with dataflow but also with direct runner and on both runners I get a this error: "Timeout while fetching topic metadata".
This error seems to be caused because of the consumer being unable to authenticate to confluent kafka since I get these warnings:
WARNING:root:severity: WARN
timestamp {
seconds: 1650473787
nanos: 331000000
}
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Bootstrap broker "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null) disconnected"
instruction_id: "bundle_1"
transform_id: "ReadFromKafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/ParDo(GenerateKafkaSourceDescriptor)/ParMultiDo(GenerateKafkaSourceDescriptor)"
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "21"
And after this warning I get this other warning:
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Error connecting to node "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null)"
trace: "java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]\n\tat
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login
Two important things are that I already build a consumer on python but without the ReadFromKafka apache beam io and it connects and consumes perfectly to the topic so the credentials I'm using are the same and I have the same protocol "SASL_SSL""PLAIN" (related to this also I don't have any idea why is a kerberos error popping since I'm not using kerberos authentication)... the other thing is that the transform 'ReadFromKafka' is used through a expansion service since this transform is only supported by java but with apache beam I can use it on Python.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
好的,这个错误真的很容易解决,我在“ sasl.meneriancs”中有一个错别字,因此该属性没有得到认可。
而不是SASL。机械使用SASL.Menerianm。
Ok the mistake was really simple to fix, I had a typo in 'sasl.mechanisms' so the property wasn't getting recognized.
Instead of sasl.mechanisms use sasl.mechanism.