Flink Python DataStream API Kafka消费者-NoclassDeffoundError bytearrayDeserializer错误
我在pyflink的PY4J侧有一个错误。代码下面: env = StreamExecutionEnvironment.get_execution_environment() env.add_jars("file:/" + os.getcwd() +…
apache flink键入kafkasource
我按照所述在这里。现在,我尝试使用a jdbc接收器。 现在,Kafka的来源似乎没有类型。因此,在编写SQL的语句时,它看起来都像 nothing 。 我如何使用…
在Kafka Java中将所有消息传达给主题之后,如何发送元数据?
我通过使用Java Kafka客户库库将学生详细信息发布到KAFKA主题,并且一旦制作了所有记录,我想要的是,我想将消息计数带有其他一些元数据将其发送到同…
KAFKA模式注册表 - 模式主题的保留政策_schemas不正确
我正在尝试在Kubernetes部署模式注册表。但是,当我尝试使用部署文件创建一个POD时,它总是会在以下错误中重新启动: io.confluent.kafka.schemaregis…
telnet中的JMX端口答案,但在JConsole中没有
我正在尝试在Docker下运行的Kafka Connect中启用JMX。我在我的docker-compose.yml中有以下配置: version: '2' services: connect-oracle-jmx: image:…
通过NGINX自定义路由 - 从第三方来源读取
我是Nginx的新手,想知道它是否可以帮助我解决我们遇到的用例。 我有n个节点,它们是从具有相同组ID的Kafka主题中读取的,这意味着每个节点都有不相交…
更改Kubernetes配置中Kafka主题的Kafka_cfg_num_partitions值
目前,我的应用程序具有kafka的Bitnami容器。我想将主题中的分区数更改为三个。但是我不知道我应该在哪里做。当我去Minikube仪表板时,我看到有一个KA…
Kafka:“经纪人未能验证记录”在增加分区之后
我通过Terraform增加了现有的Kafka主题的分区。分区尺寸成功增加了,但是当我测试与该主题的连接时,我会得到一个“经纪人无法验证记录” 测试方法: …
在Kafka经纪本身或通过Spring Cloud流中配置主题
我必须在应用程序中定义主题。我正在使用Spring Cloud Stream在应用程序之间配置消息传递。 Spring Cloud Stream在后台使用Kafka活页夹。我想知道创建…
KAFKA:消耗睾丸/0时错误:Kafka服务器:请求超过请求中用户指定的时间限制
Helo, “ Kafka:消耗睾丸/0:KAFKA服务器时的错误:请求超过了请求中用户指定的时间限制。” 我在从Kafka Broker消费时间歇性地收到上述错误(即Kaf…
在阅读时,使用Spark写给Kafka的JSON字符串未正确转换给Kafka
我读了一个.csv文件以创建数据框架,我想将数据写入Kafka主题。代码是以下内容 df = spark.read.format("csv").option("header", "true").load(f'{fil…
在特定时间间隔(1分钟)中检索发布到KAFKA主题的消息总数
我想使用一个KAFKA命令,该命令在时间间隔内获取发布给Kafka主题的消息总数(理想情况下是1分钟)。这样的命令是否存在?最好以一种有效的方式获取计…