无法在Apache Flink中使用PULSAR共享订阅
Flink Data Pipeline从Apache Pulsar分区主题中读取。我已经将pulsarsource
订阅设置为sisscriptionType.exclusive
。将其更改为sisscriptionType.shared
,它希望命名空间启用事务策略。然后,我在经纪人中启用了交易管理器并启动了交易管理器。但是仍然得到这个例外,
Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:140)
at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.fetch(PulsarUnorderedPartitionSplitReader.java:55)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
at org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction(PulsarTransactionUtils.java:56)
at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.newTransaction(PulsarUnorderedPartitionSplitReader.java:165)
at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.pollMessage(PulsarUnorderedPartitionSplitReader.java:91)
at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:115)
... 9 more
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction(PulsarTransactionUtils.java:51)
... 12 more
Caused by: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.getExceptionByServerError(TransactionMetaStoreHandler.java:419)
at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.handleTransactionFailOp(TransactionMetaStoreHandler.java:352)
at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.handleNewTxnResponse(TransactionMetaStoreHandler.java:210)
at org.apache.pulsar.client.impl.ClientCnx.handleNewTxnResponse(ClientCnx.java:945)
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:382)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
我看不到一种方法(方法)可以启用Flink的pulsarsource
内部的交易。
A Flink data pipeline reads from Apache Pulsar partitioned topic. I have set the PulsarSource
subscription to SubscriptionType.Exclusive
. When this is changed to SubscriptionType.Shared
it expects transaction policies to be enabled for the namespace. I then enabled transaction manager in the broker and started it. But still get this exception
Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:140)
at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.fetch(PulsarUnorderedPartitionSplitReader.java:55)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
at org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction(PulsarTransactionUtils.java:56)
at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.newTransaction(PulsarUnorderedPartitionSplitReader.java:165)
at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.pollMessage(PulsarUnorderedPartitionSplitReader.java:91)
at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:115)
... 9 more
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction(PulsarTransactionUtils.java:51)
... 12 more
Caused by: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.getExceptionByServerError(TransactionMetaStoreHandler.java:419)
at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.handleTransactionFailOp(TransactionMetaStoreHandler.java:352)
at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.handleNewTxnResponse(TransactionMetaStoreHandler.java:210)
at org.apache.pulsar.client.impl.ClientCnx.handleNewTxnResponse(ClientCnx.java:945)
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:382)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
I do not see a way (method) to enable the transaction inside the PulsarSource
of Flink.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
独立的pulsar使用
standalone.conf
,而不是broker.conf
。我错误地启用了broker.conf
的事务协调员。The standalone Pulsar uses the
standalone.conf
and not thebroker.conf
. I wrongly have enabled the transaction coordinator inbroker.conf
.