无法在Nestjs进行Kafka事件

发布于 2025-02-11 03:29:07 字数 4773 浏览 1 评论 0原文

我在主和控制器文件中具有以下配置。我有一个外部kafka

,我在main.ts中定义了

  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: ['localhost:9092'],
      },
      consumer: {
        groupId: 'consumer-1',
      },
    },
  });
  await app.startAllMicroservices();
  app.useGlobalPipes(
    new ValidationPipe({
      transform: true,
    }),
  );

controller.ts

export class Controller implements OnModuleInit {
  constructor(private readonly service: Service) {}

  @Client({
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: ['localhost:9092'],
      },
      consumer: {
        groupId: 'consumer-1',
      },
    },
  })
  client: ClientKafka;

  async onModuleInit() {
    this.client.subscribeToResponseOf('topic-tx');
    await this.client.connect();
    Logger.log(
      'consumer assignments: ' +
        JSON.stringify(this.client.getConsumerAssignments()),
    );
  }

  @MessagePattern('topic-tx', Transport.KAFKA)
  async handleEntityCreated(@Payload('value') message: IResponseValue) {
    console.log('Received event: ', message);
  }

Kafka主题的事件在控制台上以缓冲区的形式,它是不能适当地进行序列化。

Received event:  $da4fa4c3-9e91-43d9-acaa-a07d1fed2635"�
*0x5bcd9e419a11AB71f5eea1a2CFf9B0694C990Baf29*0xDFB50936C5d83b8367BDC01B17c386203AA60368*4611920x0:�0xd3fc98640000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990baf00000000000000000000000000000000000000000000000000000000000000380000000000000000000000000000000000000000000000000000000000000060000000000000000000000000000000000000000000000000000000000000004034346532343639353263666337363430663535306162386233343363633564633865623033303937373534343030343935306134646432393134663864663561B�0xf901261d8082b42794dfb50936c5d83b8367bdc01b17c386203aa6036880b8c4d3fc98640000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990baf00000000000000000000000000000000000000000000000000000000000000380000000000000000000000000000000000000000000000000000000000000060000000000000000000000000000000000000000000000000000000000000004034346532343639353263666337363430663535306162386233343363633564633865623033303937373534343030343935306134646432393134663864663561820a96a0d524dbe02b8120452b988b0409281e0cf4f7db1ad1dba3c3acfdb11d18c8cc5fa043d1cdea80c15f180854e4de70051cff66f59067ef6a6bbb90156a30285d100eJB0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134�legacy*�
B0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134B0x362711a341c6e4647f8978a9bb01df8097d9dd9177989389fc17627c66ee1615�%@R�0x00000000000000000000000000000000000000000000008000000000000000000000000000000000000020000000000000000040000000000000000000012000000000000000110000000020000000000000000000000000000000000000000000000000020000000000000000000800000000002000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008000000000000000020000004000000000000000000000000000000000000000000000200080000000800Z�
*0xDFB50936C5d83b8367BDC01B17c386203AA60368B0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62B0x0000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990bafB0x0000000000000000000000000000000000000000000000000000000000000000B0x0000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990baf�0x00000000000000000000000000000000000000000000000000000000000000380000000000000000000000000000000000000000000000000000000000000001"7TransferSingle(address,address,address,uint256,uint256)*2
from*0x0000000000000000000000000000000000000000*0
to*0x5bcd9e419a11AB71f5eea1a2CFf9B0694C990Baf*
id56*

value1*6
operator*0x5bcd9e419a11AB71f5eea1a2CFf9B0694C990Baf0�%:B0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134JB0x362711a341c6e4647f8978a9bb01df8097d9dd9177989389fc17627c66ee1615Z�
*0xDFB50936C5d83b8367BDC01B17c386203AA60368B0x6bb7ff708619ba0610cba295a58592e0451dee2622938c8755667688daf3529bB0x0000000000000000000000000000000000000000000000000000000000000038�0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000004034346532343639353263666337363430663535306162386233343363633564633865623033303937373534343030343935306134646432393134663864663561"URI(string,uint256)*I
value@44e246952cfc7640f550ab8b343cc5dc8eb030977544004950a4dd2914f8df5a*
id560�%:B0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134JB0x362711a341c6e4647f8978a9bb01df8097d9dd9177989389fc17627c66ee1615P`��h��r0x0�
MetalToken�latest:devB$0afcfd48-c20b-4db7-ab35-73f90e937c37

如何将KAFKA经纪人传入的消息归为适当的接口

I have the following configuration, in main and controller file. I have an external kafka

I have defined the microservice in the main.ts

  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: ['localhost:9092'],
      },
      consumer: {
        groupId: 'consumer-1',
      },
    },
  });
  await app.startAllMicroservices();
  app.useGlobalPipes(
    new ValidationPipe({
      transform: true,
    }),
  );

In the controller.ts

export class Controller implements OnModuleInit {
  constructor(private readonly service: Service) {}

  @Client({
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: ['localhost:9092'],
      },
      consumer: {
        groupId: 'consumer-1',
      },
    },
  })
  client: ClientKafka;

  async onModuleInit() {
    this.client.subscribeToResponseOf('topic-tx');
    await this.client.connect();
    Logger.log(
      'consumer assignments: ' +
        JSON.stringify(this.client.getConsumerAssignments()),
    );
  }

  @MessagePattern('topic-tx', Transport.KAFKA)
  async handleEntityCreated(@Payload('value') message: IResponseValue) {
    console.log('Received event: ', message);
  }

The events from kafka topic is printed on the console as buffer, and it is not de-serializing appropriately.

Received event:  $da4fa4c3-9e91-43d9-acaa-a07d1fed2635"�
*0x5bcd9e419a11AB71f5eea1a2CFf9B0694C990Baf29*0xDFB50936C5d83b8367BDC01B17c386203AA60368*4611920x0:�0xd3fc98640000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990baf00000000000000000000000000000000000000000000000000000000000000380000000000000000000000000000000000000000000000000000000000000060000000000000000000000000000000000000000000000000000000000000004034346532343639353263666337363430663535306162386233343363633564633865623033303937373534343030343935306134646432393134663864663561B�0xf901261d8082b42794dfb50936c5d83b8367bdc01b17c386203aa6036880b8c4d3fc98640000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990baf00000000000000000000000000000000000000000000000000000000000000380000000000000000000000000000000000000000000000000000000000000060000000000000000000000000000000000000000000000000000000000000004034346532343639353263666337363430663535306162386233343363633564633865623033303937373534343030343935306134646432393134663864663561820a96a0d524dbe02b8120452b988b0409281e0cf4f7db1ad1dba3c3acfdb11d18c8cc5fa043d1cdea80c15f180854e4de70051cff66f59067ef6a6bbb90156a30285d100eJB0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134�legacy*�
B0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134B0x362711a341c6e4647f8978a9bb01df8097d9dd9177989389fc17627c66ee1615�%@R�0x00000000000000000000000000000000000000000000008000000000000000000000000000000000000020000000000000000040000000000000000000012000000000000000110000000020000000000000000000000000000000000000000000000000020000000000000000000800000000002000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008000000000000000020000004000000000000000000000000000000000000000000000200080000000800Z�
*0xDFB50936C5d83b8367BDC01B17c386203AA60368B0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62B0x0000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990bafB0x0000000000000000000000000000000000000000000000000000000000000000B0x0000000000000000000000005bcd9e419a11ab71f5eea1a2cff9b0694c990baf�0x00000000000000000000000000000000000000000000000000000000000000380000000000000000000000000000000000000000000000000000000000000001"7TransferSingle(address,address,address,uint256,uint256)*2
from*0x0000000000000000000000000000000000000000*0
to*0x5bcd9e419a11AB71f5eea1a2CFf9B0694C990Baf*
id56*

value1*6
operator*0x5bcd9e419a11AB71f5eea1a2CFf9B0694C990Baf0�%:B0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134JB0x362711a341c6e4647f8978a9bb01df8097d9dd9177989389fc17627c66ee1615Z�
*0xDFB50936C5d83b8367BDC01B17c386203AA60368B0x6bb7ff708619ba0610cba295a58592e0451dee2622938c8755667688daf3529bB0x0000000000000000000000000000000000000000000000000000000000000038�0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000004034346532343639353263666337363430663535306162386233343363633564633865623033303937373534343030343935306134646432393134663864663561"URI(string,uint256)*I
value@44e246952cfc7640f550ab8b343cc5dc8eb030977544004950a4dd2914f8df5a*
id560�%:B0x02dff5ee07b16609c959660789dd743d648a5f44f4ad3651fefe76f7ee004134JB0x362711a341c6e4647f8978a9bb01df8097d9dd9177989389fc17627c66ee1615P`��h��r0x0�
MetalToken�latest:devB$0afcfd48-c20b-4db7-ab35-73f90e937c37

How to deserialize the incoming message from kafka broker into appropriate interface

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

つ低調成傷 2025-02-18 03:29:07

我对Nestjs Kafka客户端代码进行了探索越多,我找到了解决问题的解决方案。另外,此 https://github.com/nestjs/nestjs/nest/nest/sissues/issues/3726 帮助我解决了这个问题。

确切的问题是:在Kafka中,微服务缓冲区的值始终被串制,这禁止解析正确的格式。

解决方案是设置解析器选项:Kafkaconfig中的keepbinary为true,以保留缓冲区格式。

{
  transport: Transport.KAFKA,

  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'consumer-1',
    },
    parser: { keepBinary: true },
  },
}

The more i explored into nestjs kafka client code, i found the solution to my problem. Also, this https://github.com/nestjs/nest/issues/3726 helped me resolve the issue.

The exact issue was : In Kafka Microservice Buffer values are always stringified, which was prohibiting the parsing to the right format.

The solution is to set parser options: keepBinary as true in the kafkaconfig, so as to preserve the buffer format.

{
  transport: Transport.KAFKA,

  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'consumer-1',
    },
    parser: { keepBinary: true },
  },
}
﹏半生如梦愿梦如真 2025-02-18 03:29:07

如果您的数据具有自定义格式,则可以按以下方式设置自定义求职者:

import { Deserializer } from '@nestjs/microservices';

export class CustomDeserializer implements Deserializer {
  public deserialize(data: Buffer): unknown {
    const stringifiedData = data.toString();
    
    // do stuff here...

    return deserializedData;
  }
}

之后,您可以在微服务中注入该供应仪以下内容:

  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.KAFKA,
    options: {
      deserializer: new CustomDeserializer();
      client: {
        brokers: ['localhost:9092'],
      },
      consumer: {
        groupId: 'consumer-1',
      },
    },
  });
  await app.startAllMicroservices();

If your data has a custom format, you can setup a custom deserializer like the following:

import { Deserializer } from '@nestjs/microservices';

export class CustomDeserializer implements Deserializer {
  public deserialize(data: Buffer): unknown {
    const stringifiedData = data.toString();
    
    // do stuff here...

    return deserializedData;
  }
}

After that, you can inject that deserializer in your microservice with the following:

  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.KAFKA,
    options: {
      deserializer: new CustomDeserializer();
      client: {
        brokers: ['localhost:9092'],
      },
      consumer: {
        groupId: 'consumer-1',
      },
    },
  });
  await app.startAllMicroservices();
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文