KAFKAPRODUCER-获取错误以连接到Kafka(60.0秒后无法更新元数据)

发布于 2025-02-11 12:16:42 字数 5794 浏览 2 评论 0原文

我正在尝试阅读Oracle的数据并发送到Kafka主题。我能够从Oracle阅读,将其放入数据框中,然后在下面的代码中显示有关Kafka的所有参数,但是我遇到了错误: kafka.errors.kafkatimeouterror:kafkatimeouterror:60.0秒后无法更新元数据。

这个链接看起来相似,但没有帮助我 KafkaTimeoutError: Failed to update metadata after 60.0 secs

I将Amazon托管流媒体用于Apache Kafka(MSK)。 我有两个经纪人。我是否需要将两者都放置为我的引导服务器,或者仅仅是主要的Bootstrap服务器?

它连接到Kafka并断开连接,但不要向Kafka发送任何消息。

这是我的代码...

    try:
        conn = OracleHook(oracle_conn_id=oracle_conn_id).get_conn()
        query = "Select * from sales"
        df = pd.read_sql(query, conn)

        topic = 'my-topic'
        producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x:dumps(x).encode('utf-8'), api_version=(0, 10, 1)
                                 )
        for raw in pd.read_sql(query, conn):
            producer.send(topic, raw.encode('utf-8'))

        print('Number os records')

        conn.close()

    except Exception as error:
        raise error
    return

和日志 怀疑Kafkaproducer-获取错误以连接到kafka {{conn.py:381}} info-< brokerConnection node_id = bootstrap-0 host ='my-bootstrap_servers':连接到'my-server'] {{conn.py:410}} info-< brokerConnection node_id = bootstrap-0 host ='my-bootstrap_servers':连接完成。 {{conn.py:1096}} error-< brokerconnection node_id = bootstrap-0主机='my-bootstrap_servers':socket断开连接 {{conn.py:919}} info-< brokerConnection node_id = bootstrap-0 host ='my-bootstrap_servers':关闭连接。 kafkaconnectionerror:插座断开

{{taskinstance.py:1703}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 151, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 162, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 576, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 703, in _wait_on_metadata
    "Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
{{taskinstance.py:1280}} INFO - Marking task as FAILED. dag_id=bkbne_ora_to_kafka, task_id=task_id, execution_date=20220624T204102, start_date=20220628T171225, end_date=20220628T171327
{{standard_task_runner.py:91}} ERROR - Failed to execute job 95 for task task_id
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 184, in _run_raw_task
    error_file=args.error_file,
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 151, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 162, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/send_to_kafka/src/send_to_kafka.py", line 63, in f_se
    raise e
  File "/usr/local/airflow/dags/send_to_kafka/src/send_to_kafka.py", line 55, in send_to_kafka
    producer.send(topic, row.encode('utf-8'))
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 576, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 703, in _wait_on_metadata
    "Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

某人可以帮助我吗?我不知道这里发生了什么

I am trying to read data from Oracle and send to a Kafka topic. I was able to read from oracle, put it into a dataframe and I put all parameters about Kafka as I show in my code below, but I am getting the error:
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

This link look similar, but did not help me
KafkaTimeoutError: Failed to update metadata after 60.0 secs

I use Amazon Managed Streaming for Apache Kafka (MSK).
I have two Brokers. Do I need put both as my Bootstrap servers or just the main Bootstrap servers?

It connect to kafka and disconnect but don't send any message to kafka.

Here is my code ...

    try:
        conn = OracleHook(oracle_conn_id=oracle_conn_id).get_conn()
        query = "Select * from sales"
        df = pd.read_sql(query, conn)

        topic = 'my-topic'
        producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x:dumps(x).encode('utf-8'), api_version=(0, 10, 1)
                                 )
        for raw in pd.read_sql(query, conn):
            producer.send(topic, raw.encode('utf-8'))

        print('Number os records')

        conn.close()

    except Exception as error:
        raise error
    return

... and the log
doubt KafkaProducer - Getting error to connect to kafka
{{conn.py:381}} INFO - <BrokerConnection node_id=bootstrap-0 host='my-bootstrap_servers': connecting to 'my-server']
{{conn.py:410}} INFO - <BrokerConnection node_id=bootstrap-0 host='my-bootstrap_servers': Connection complete.
{{conn.py:1096}} ERROR - <BrokerConnection node_id=bootstrap-0 host='my-bootstrap_servers': socket disconnected
{{conn.py:919}} INFO - <BrokerConnection node_id=bootstrap-0 host='my-bootstrap_servers': Closing connection. KafkaConnectionError: socket disconnected

{{taskinstance.py:1703}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 151, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 162, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 576, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 703, in _wait_on_metadata
    "Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
{{taskinstance.py:1280}} INFO - Marking task as FAILED. dag_id=bkbne_ora_to_kafka, task_id=task_id, execution_date=20220624T204102, start_date=20220628T171225, end_date=20220628T171327
{{standard_task_runner.py:91}} ERROR - Failed to execute job 95 for task task_id
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 184, in _run_raw_task
    error_file=args.error_file,
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 151, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 162, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/send_to_kafka/src/send_to_kafka.py", line 63, in f_se
    raise e
  File "/usr/local/airflow/dags/send_to_kafka/src/send_to_kafka.py", line 55, in send_to_kafka
    producer.send(topic, row.encode('utf-8'))
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 576, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 703, in _wait_on_metadata
    "Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

Someone could help me with this? I don't know what is happen here

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

┼── 2025-02-18 12:16:42

确保您实际上具有与ping/ncat/kafka控制台工具之类的上游Kafka经纪人(最好是每个人)的连接性。事实,您无法获得元数据(与插座断开)指向网络“问题”(不良config / firewall?)。

我是否需要将两者都放为我的引导服务器,或者只是主要的bootstrap服务器?

需要?否。

但是,将您放入引导程序中的服务器越多,您的应用程序的宽容就越大(至少在Java Client中,它选择一个随机的一个要首先连接到-c(python),一个人应该是相同的afaict)。

Ensure that you actually have the connectivity to upstream Kafka brokers (preferably every one of them) with something like ping/ncat/kafka console tools. The fact you can't get metadata (have socket disconnects) points to network "problems" (bad config / firewall?).

Do I need put both as my Bootstrap servers or just the main Bootstrap servers?

Need? No.

However the more servers you put into bootstrap, the more tolerant to failures your application is (at least in Java client, where it picks a random one to first to connect to - C (Python) one should be the same AFAICT).

毁梦 2025-02-18 12:16:42

您的代码未在实际经纪人上运行,因此Bootstrap_servers = ['localhost:9092']应该更改为MSK为您提供的地址。您可能还需要添加身份验证设置,具体取决于您使用的端口并配置了群集。

关于您的代码逻辑,我建议使用MSK Connect与JDBC源或Debezium将数据库表读取到Kafka中。

Your code isn't running on the actual brokers, so bootstrap_servers=['localhost:9092'] should be changed to the address(es) that MSK provides you. You may also need to add authentication settings, depending on which port you use, and have configured your cluster.

Regarding the logic of your code, I'd suggest using MSK Connect with JDBC Source or Debezium to read a database table into Kafka.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文