无法在Nestjs进行Kafka事件
我在主和控制器文件中具有以下配置。我有一个外部kafka
,我在main.ts
中定义了
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'consumer-1',
},
},
});
await app.startAllMicroservices();
app.useGlobalPipes(
new ValidationPipe({
transform: true,
}),
);
controller.ts
export class Controller implements OnModuleInit {
constructor(private readonly service: Service) {}
@Client({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'consumer-1',
},
},
})
client: ClientKafka;
async onModuleInit() {
this.client.subscribeToResponseOf('topic-tx');
await this.client.connect();
Logger.log(
'consumer assignments: ' +
JSON.stringify(this.client.getConsumerAssignments()),
);
}
@MessagePattern('topic-tx', Transport.KAFKA)
async handleEntityCreated(@Payload('value') message: IResponseValue) {
console.log('Received event: ', message);
}
Kafka主题的事件在控制台上以缓冲区的形式,它是不能适当地进行序列化。
Received event: $da4fa4c3-9e91-43d9-acaa-a07d1fed2635"�
*0x5bcd9e419a11AB71f5eea1a2CFf9B0694C990Baf29*0xDFB50936C5d83b8367BDC01B17c386203AA60368*4611920x0:�0xd3fc98640000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990baf00000000000000000000000000000000000000000000000000000000000000380000000000000000000000000000000000000000000000000000000000000060000000000000000000000000000000000000000000000000000000000000004034346532343639353263666337363430663535306162386233343363633564633865623033303937373534343030343935306134646432393134663864663561B�0xf901261d8082b42794dfb50936c5d83b8367bdc01b17c386203aa6036880b8c4d3fc98640000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990baf00000000000000000000000000000000000000000000000000000000000000380000000000000000000000000000000000000000000000000000000000000060000000000000000000000000000000000000000000000000000000000000004034346532343639353263666337363430663535306162386233343363633564633865623033303937373534343030343935306134646432393134663864663561820a96a0d524dbe02b8120452b988b0409281e0cf4f7db1ad1dba3c3acfdb11d18c8cc5fa043d1cdea80c15f180854e4de70051cff66f59067ef6a6bbb90156a30285d100eJB0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134�legacy*�
B0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134B0x362711a341c6e4647f8978a9bb01df8097d9dd9177989389fc17627c66ee1615�%@R�0x�
*0xDFB50936C5d83b8367BDC01B17c386203AA60368B0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62B0x0000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990bafB0x0000000000000000000000000000000000000000000000000000000000000000B0x0000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990baf�0x00000000000000000000000000000000000000000000000000000000000000380000000000000000000000000000000000000000000000000000000000000001"7TransferSingle(address,address,address,uint256,uint256)*2
from*0x0000000000000000000000000000000000000000*0
to*0x5bcd9e419a11AB71f5eea1a2CFf9B0694C990Baf*
id56*
value1*6
operator*0x5bcd9e419a11AB71f5eea1a2CFf9B0694C990Baf0�%:B0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134JB0x362711a341c6e4647f8978a9bb01df8097d9dd9177989389fc17627c66ee1615Z�
*0xDFB50936C5d83b8367BDC01B17c386203AA60368B0x6bb7ff708619ba0610cba295a58592e0451dee2622938c8755667688daf3529bB0x0000000000000000000000000000000000000000000000000000000000000038�0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000004034346532343639353263666337363430663535306162386233343363633564633865623033303937373534343030343935306134646432393134663864663561"URI(string,uint256)*I
value@44e246952cfc7640f550ab8b343cc5dc8eb030977544004950a4dd2914f8df5a*
id560�%:B0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134JB0x362711a341c6e4647f8978a9bb01df8097d9dd9177989389fc17627c66ee1615P`��h��r0x0�
MetalToken�latest:devB$0afcfd48-c20b-4db7-ab35-73f90e937c37
如何将KAFKA经纪人传入的消息归为适当的接口
I have the following configuration, in main and controller file. I have an external kafka
I have defined the microservice in the main.ts
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'consumer-1',
},
},
});
await app.startAllMicroservices();
app.useGlobalPipes(
new ValidationPipe({
transform: true,
}),
);
In the controller.ts
export class Controller implements OnModuleInit {
constructor(private readonly service: Service) {}
@Client({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'consumer-1',
},
},
})
client: ClientKafka;
async onModuleInit() {
this.client.subscribeToResponseOf('topic-tx');
await this.client.connect();
Logger.log(
'consumer assignments: ' +
JSON.stringify(this.client.getConsumerAssignments()),
);
}
@MessagePattern('topic-tx', Transport.KAFKA)
async handleEntityCreated(@Payload('value') message: IResponseValue) {
console.log('Received event: ', message);
}
The events from kafka topic is printed on the console as buffer, and it is not de-serializing appropriately.
Received event: $da4fa4c3-9e91-43d9-acaa-a07d1fed2635"�
*0x5bcd9e419a11AB71f5eea1a2CFf9B0694C990Baf29*0xDFB50936C5d83b8367BDC01B17c386203AA60368*4611920x0:�0xd3fc98640000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990baf00000000000000000000000000000000000000000000000000000000000000380000000000000000000000000000000000000000000000000000000000000060000000000000000000000000000000000000000000000000000000000000004034346532343639353263666337363430663535306162386233343363633564633865623033303937373534343030343935306134646432393134663864663561B�0xf901261d8082b42794dfb50936c5d83b8367bdc01b17c386203aa6036880b8c4d3fc98640000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990baf00000000000000000000000000000000000000000000000000000000000000380000000000000000000000000000000000000000000000000000000000000060000000000000000000000000000000000000000000000000000000000000004034346532343639353263666337363430663535306162386233343363633564633865623033303937373534343030343935306134646432393134663864663561820a96a0d524dbe02b8120452b988b0409281e0cf4f7db1ad1dba3c3acfdb11d18c8cc5fa043d1cdea80c15f180854e4de70051cff66f59067ef6a6bbb90156a30285d100eJB0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134�legacy*�
B0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134B0x362711a341c6e4647f8978a9bb01df8097d9dd9177989389fc17627c66ee1615�%@R�0x�
*0xDFB50936C5d83b8367BDC01B17c386203AA60368B0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62B0x0000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990bafB0x0000000000000000000000000000000000000000000000000000000000000000B0x0000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990baf�0x00000000000000000000000000000000000000000000000000000000000000380000000000000000000000000000000000000000000000000000000000000001"7TransferSingle(address,address,address,uint256,uint256)*2
from*0x0000000000000000000000000000000000000000*0
to*0x5bcd9e419a11AB71f5eea1a2CFf9B0694C990Baf*
id56*
value1*6
operator*0x5bcd9e419a11AB71f5eea1a2CFf9B0694C990Baf0�%:B0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134JB0x362711a341c6e4647f8978a9bb01df8097d9dd9177989389fc17627c66ee1615Z�
*0xDFB50936C5d83b8367BDC01B17c386203AA60368B0x6bb7ff708619ba0610cba295a58592e0451dee2622938c8755667688daf3529bB0x0000000000000000000000000000000000000000000000000000000000000038�0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000004034346532343639353263666337363430663535306162386233343363633564633865623033303937373534343030343935306134646432393134663864663561"URI(string,uint256)*I
value@44e246952cfc7640f550ab8b343cc5dc8eb030977544004950a4dd2914f8df5a*
id560�%:B0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134JB0x362711a341c6e4647f8978a9bb01df8097d9dd9177989389fc17627c66ee1615P`��h��r0x0�
MetalToken�latest:devB$0afcfd48-c20b-4db7-ab35-73f90e937c37
How to deserialize the incoming message from kafka broker into appropriate interface
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我对Nestjs Kafka客户端代码进行了探索越多,我找到了解决问题的解决方案。另外,此 https://github.com/nestjs/nestjs/nest/nest/sissues/issues/3726 帮助我解决了这个问题。
确切的问题是:在Kafka中,微服务缓冲区的值始终被串制,这禁止解析正确的格式。
解决方案是设置解析器选项:Kafkaconfig中的keepbinary为true,以保留缓冲区格式。
The more i explored into nestjs kafka client code, i found the solution to my problem. Also, this https://github.com/nestjs/nest/issues/3726 helped me resolve the issue.
The exact issue was : In Kafka Microservice Buffer values are always stringified, which was prohibiting the parsing to the right format.
The solution is to set parser options: keepBinary as true in the kafkaconfig, so as to preserve the buffer format.
如果您的数据具有自定义格式,则可以按以下方式设置自定义求职者:
之后,您可以在微服务中注入该供应仪以下内容:
If your data has a custom format, you can setup a custom deserializer like the following:
After that, you can inject that deserializer in your microservice with the following: