如何在一项服务中使用多个ClientKafka?
我在一项服务中使用多个clientkafka
的问题有问题,这是我的实现:
@Controller()
export class ApiController implements OnModuleInit {
constructor(
@Inject("ACCOUNT_SERVICE") private readonly accountKafkaClient: ClientKafka,
@Inject("WORKSPACE_SERVICE") private readonly workspaceKafkaClient: ClientKafka
) { }
async onModuleInit() {
const requestPatterns = [
'topic'
];
requestPatterns.forEach((pattern) => {
this.accountKafkaClient.subscribeToResponseOf(`account.${pattern}`);
});
await this.accountKafkaClient.connect();
}
async onModuleDestroy() {
await this.accountKafkaClient.close();
}
@Get()
async sendMessage() {
const data = {
msg: "account.topic"
}
const kafkaResponse = this.accountKafkaClient.send<any>('account.topic', JSON.stringify(data));
const response = await firstValueFrom(kafkaResponse);
const kafkaResponse2 = this.workspaceKafkaClient.send<any>('workspace.topic', JSON.stringify(response )) //THIS IS NOT RUNNING, WORKSPACE_SERVICE NOT RECEIVE ANY MESSAGE
return await firstValueFrom(kafkaResponse2);
}
}
有人可以告诉我为什么workspacekafkaclient
没有将任何消息发送到workspace_service
微服务?我尝试在onModule中传递此客户端...
accountkafkaclient
之类的功能,但这对我没有帮助,
这也是我在模块中的设置:
@Module({
imports: [
ClientsModule.register([
{
name: 'ACCOUNT_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'account_service',
brokers: ['localhost:29092'],
},
consumer: {
groupId: 'account-consumer',
},
},
},
{
name: 'WORKSPACE_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'workspace_service',
brokers: ['localhost:29092'],
},
consumer: {
groupId: 'workspace-consumer',
},
},
},
]),
],
controllers: [ApiController],
providers: [
ApiService,
// KafkaProducerProvider,
],
})
export class ApiModule {}
感谢您的任何帮助!
I have a problem with using multiple ClientKafka
in one service, here is my implementation:
@Controller()
export class ApiController implements OnModuleInit {
constructor(
@Inject("ACCOUNT_SERVICE") private readonly accountKafkaClient: ClientKafka,
@Inject("WORKSPACE_SERVICE") private readonly workspaceKafkaClient: ClientKafka
) { }
async onModuleInit() {
const requestPatterns = [
'topic'
];
requestPatterns.forEach((pattern) => {
this.accountKafkaClient.subscribeToResponseOf(`account.${pattern}`);
});
await this.accountKafkaClient.connect();
}
async onModuleDestroy() {
await this.accountKafkaClient.close();
}
@Get()
async sendMessage() {
const data = {
msg: "account.topic"
}
const kafkaResponse = this.accountKafkaClient.send<any>('account.topic', JSON.stringify(data));
const response = await firstValueFrom(kafkaResponse);
const kafkaResponse2 = this.workspaceKafkaClient.send<any>('workspace.topic', JSON.stringify(response )) //THIS IS NOT RUNNING, WORKSPACE_SERVICE NOT RECEIVE ANY MESSAGE
return await firstValueFrom(kafkaResponse2);
}
}
can someone tell me why workspaceKafkaClient
is not sending any message to WORKSPACE_SERVICE
microservice? I try with passing this client in onModule...
functions like accountKafkaClient
but it didn't help me,
here is also my settings in module:
@Module({
imports: [
ClientsModule.register([
{
name: 'ACCOUNT_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'account_service',
brokers: ['localhost:29092'],
},
consumer: {
groupId: 'account-consumer',
},
},
},
{
name: 'WORKSPACE_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'workspace_service',
brokers: ['localhost:29092'],
},
consumer: {
groupId: 'workspace-consumer',
},
},
},
]),
],
controllers: [ApiController],
providers: [
ApiService,
// KafkaProducerProvider,
],
})
export class ApiModule {}
thanks for any help!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
每个应用程序您只需要一个生产者客户端,但是Kafka生产商从未立即将数据发送给经纪人。
您需要将它们冲洗才能发生,这是
等待FirstValuefrom(...)
应该这样做的事情,但是您尚未显示该方法。否则,您似乎正在尝试从一个主题中获得回复,以发送到另一个主题,这是消费者应使用的方法,而不是在一个生产者请求中阻止。
You only need one producer client per application, but Kafka producers never immediately send data to brokers.
You need to flush them for that to happen, which is what
await firstValueFrom(...)
should do, but you've not shown that method.Otherwise, you seem to be trying to get the reponse from one topic to send to another, which is what a consumer should be used for, rather than blocking on one producer request.