无法在Apache Flink中使用PULSAR共享订阅

发布于 2025-02-02 12:28:31 字数 5688 浏览 3 评论 0原文

Flink Data Pipeline从Apache Pulsar分区主题中读取。我已经将pulsarsource订阅设置为sisscriptionType.exclusive。将其更改为sisscriptionType.shared,它希望命名空间启用事务策略。然后,我在经纪人中启用了交易管理器并启动了交易管理器。但是仍然得到这个例外,

Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:140)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.fetch(PulsarUnorderedPartitionSplitReader.java:55)
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
    ... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction(PulsarTransactionUtils.java:56)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.newTransaction(PulsarUnorderedPartitionSplitReader.java:165)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.pollMessage(PulsarUnorderedPartitionSplitReader.java:91)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:115)
    ... 9 more
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction(PulsarTransactionUtils.java:51)
    ... 12 more
Caused by: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.getExceptionByServerError(TransactionMetaStoreHandler.java:419)
    at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.handleTransactionFailOp(TransactionMetaStoreHandler.java:352)
    at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.handleNewTxnResponse(TransactionMetaStoreHandler.java:210)
    at org.apache.pulsar.client.impl.ClientCnx.handleNewTxnResponse(ClientCnx.java:945)
    at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:382)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more

我看不到一种方法(方法)可以启用Flink的pulsarsource内部的交易。

A Flink data pipeline reads from Apache Pulsar partitioned topic. I have set the PulsarSource subscription to SubscriptionType.Exclusive. When this is changed to SubscriptionType.Shared it expects transaction policies to be enabled for the namespace. I then enabled transaction manager in the broker and started it. But still get this exception

Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:140)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.fetch(PulsarUnorderedPartitionSplitReader.java:55)
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
    ... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction(PulsarTransactionUtils.java:56)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.newTransaction(PulsarUnorderedPartitionSplitReader.java:165)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.pollMessage(PulsarUnorderedPartitionSplitReader.java:91)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:115)
    ... 9 more
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction(PulsarTransactionUtils.java:51)
    ... 12 more
Caused by: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.getExceptionByServerError(TransactionMetaStoreHandler.java:419)
    at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.handleTransactionFailOp(TransactionMetaStoreHandler.java:352)
    at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.handleNewTxnResponse(TransactionMetaStoreHandler.java:210)
    at org.apache.pulsar.client.impl.ClientCnx.handleNewTxnResponse(ClientCnx.java:945)
    at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:382)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more

I do not see a way (method) to enable the transaction inside the PulsarSource of Flink.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

尾戒 2025-02-09 12:28:31

独立的pulsar使用standalone.conf,而不是broker.conf。我错误地启用了broker.conf的事务协调员。

The standalone Pulsar uses the standalone.conf and not the broker.conf. I wrongly have enabled the transaction coordinator in broker.conf.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文