Spring AMQP 中协议缓冲区反序列化/编组不正确

发布于 2025-01-10 23:41:26 字数 3631 浏览 2 评论 0原文

我试图找出为什么我的协议缓冲区有效负载在从 Java 应用程序中的 Spring AMQP 侦听器接收时被错误地反序列化。 协议缓冲区写入 Go 服务并发送到 Rabbit MQ 实例,然后由 Java Spring 应用程序拾取该实例并反序列化为对象实例。

由于某种原因,收到编组对象时,其字段 2 的值为 1,而字段 1null在生成的对象中。 此外,对象上带有键 5 的重复 network_interfaces 字段为 null

有趣的是,如果我在从队列中获取消息时使用调试器,则缺失的字段是可见的,并且包含 unknownFields 下实例化的 protobuf 对象的正确数据,因此它看起来像是原始解析器的情况,而不是工作正常。 调试器显示字段键已关闭,即字段1具有键2等等。

到目前为止我已经:

  • 验证了原型文件在两个服务中是相同的,Spring 应用程序通过将另一个存储库中的 proto 文件作为 git 子模块导入
  • 来使用它尝试使用 Spring.messaging.amqp MessageConverter 显式反序列化 proto基于消息头但没有成功
  • 发送消息时尝试了不同的消息内容类型和编码头,这些似乎没有什么区别,因为我只使用消息正文。
  • 已验证,如果 Go 应用程序侦听队列,它可以反序列化并解组正确发送的消息,因此这必须是 Spring 配置问题或类似问题。

有问题的 Proto3 消息定义:

message HostStateMessage {

  message NetworkInterface {
    string name = 1;
    string mac_address = 2;
    string ipv4 = 3;
    string ipv6 = 4;
  }

  string hostname = 1;
  string description = 2;
  HostType host_type = 3;
  repeated NetworkInterface network_interfaces = 4;
}

将协议缓冲区作为消息写入 RabbitMQ 的 Go 函数:

func PublishHostMessage(config *AMQPQueueConfiguration, conn *amqp.Connection, message *protos.HostStateMessage) error {
    channel, err := conn.Channel()
    if err != nil {
        fmt.Errorf("Creating channel failed: %s\n", err)
        return err
    }

    queue, err := channel.QueueDeclare(config.HostQueueName, true, false, false, false, nil)
    if err != nil {
        fmt.Errorf("Failed to declare queue: %s\n", err)
        return err
    }

    serialMsg, err := proto.Marshal(message)
    if err != nil {
        fmt.Errorf("Failed to serialize proto message: %s\n", err)
        return err
    }
    headers := amqp.Table{}
    headers["messageType"] = "HostStateMessage"
    fmt.Println(string(serialMsg))
    err = channel.Publish("", queue.Name, false, false, amqp.Publishing{Headers: headers, ContentType: "application/x-protobuf", Body: serialMsg})
    if err != nil {
        fmt.Errorf("Failed to send message: %s\n", err)
        return err
    }

    fmt.Printf("Sent host message at: %s\n", time.Now())
    return nil
}

MessageReceiver.java 类:

@Service
public class MessageReceiver {

  @Autowired RabbitConfiguration rabbitConfiguration;
  @Autowired HostDao hostDao;

  @RabbitListener(queues = "#{rabbitConfiguration.hostQueueName}")
  public void consumeHostNotification(Message in) {
    try {
      HostStateMessage message = HostStateMessage.parseFrom(in.getBody());
      hostDao.addOrUpdateHostStateFromMessage(message);
    } catch (InvalidProtocolBufferException e) {
      System.out.println(e.getStackTrace());
    }
  }
}

Spring AMQP @Configuration bean:

@Configuration
@EnableAutoConfiguration
public class RabbitConfiguration {

  @Value("${rabbitmq.hostname}")
  private String queueHost;

  @Value("${rabbitmq.port}")
  private int queuePort;

  @Value("${rabbitmq.queues.host-queue-name}")
  public String hostQueueName;

  @Bean
  public CachingConnectionFactory connectionFactory() {
    return new CachingConnectionFactory(queueHost, queuePort);
  }

  @Bean
  public Queue hostNotificationQueue() {
    return new Queue(hostQueueName);
  }
}

Go 正在使用 google.golang Go 模块中的 .org/protobuf v1.27.1

Java 使用 com.google.protobuf 3.19.4 作为 Maven 依赖项。与 protobuf-maven-plugin 0.6.1 一起使用 protoc 进行编译。

这是一个非常令人困惑的问题,如果能得到一些见解就太好了。

I am trying to figure out why my protocol buffer payload is being incorrectly de-serialized when received from a Spring AMQP listener in my Java app.
The protocol buffer is written in a Go service and sent to a Rabbit MQ instance which is then picked up by the Java Spring app and de-serialized into an object instance.

For some reason when received the marshalled object has field 2 with the value of field 1 while field 1 is null in the resulting object.
Additionally the repeated network_interfaces field with key 5 is null on the object.

Interestingly if I use the debugger when picking up a message from the queue, the missing fields are visible and contain the correct data on the instantiated protobuf object under unknownFields so it looks like a case of the proto parser not working correctly.
The debugger shows that the field keys are off, i.e. field 1 has the key 2 and so on.

So far I have:

  • Verified the proto files are the same in both services, the Spring app is using the proto file from the other repo by importing it as a git submodule
  • Attempted to use a Spring.messaging.amqp MessageConverter to deserialize the proto explicitly based on message headers with no success
  • Tried different message content type and encoding headers when sending the message, these don't appear to make a difference as I am only using the message body.
  • Verified that if the Go app listens to the queue that it can deserialize and unmarshal a message it sent correctly, so this has to be a Spring config issue or similar.

The Proto3 message definition in question:

message HostStateMessage {

  message NetworkInterface {
    string name = 1;
    string mac_address = 2;
    string ipv4 = 3;
    string ipv6 = 4;
  }

  string hostname = 1;
  string description = 2;
  HostType host_type = 3;
  repeated NetworkInterface network_interfaces = 4;
}

The Go function which writes the protocol buffer to RabbitMQ as a message:

func PublishHostMessage(config *AMQPQueueConfiguration, conn *amqp.Connection, message *protos.HostStateMessage) error {
    channel, err := conn.Channel()
    if err != nil {
        fmt.Errorf("Creating channel failed: %s\n", err)
        return err
    }

    queue, err := channel.QueueDeclare(config.HostQueueName, true, false, false, false, nil)
    if err != nil {
        fmt.Errorf("Failed to declare queue: %s\n", err)
        return err
    }

    serialMsg, err := proto.Marshal(message)
    if err != nil {
        fmt.Errorf("Failed to serialize proto message: %s\n", err)
        return err
    }
    headers := amqp.Table{}
    headers["messageType"] = "HostStateMessage"
    fmt.Println(string(serialMsg))
    err = channel.Publish("", queue.Name, false, false, amqp.Publishing{Headers: headers, ContentType: "application/x-protobuf", Body: serialMsg})
    if err != nil {
        fmt.Errorf("Failed to send message: %s\n", err)
        return err
    }

    fmt.Printf("Sent host message at: %s\n", time.Now())
    return nil
}

The MessageReceiver.java Class:

@Service
public class MessageReceiver {

  @Autowired RabbitConfiguration rabbitConfiguration;
  @Autowired HostDao hostDao;

  @RabbitListener(queues = "#{rabbitConfiguration.hostQueueName}")
  public void consumeHostNotification(Message in) {
    try {
      HostStateMessage message = HostStateMessage.parseFrom(in.getBody());
      hostDao.addOrUpdateHostStateFromMessage(message);
    } catch (InvalidProtocolBufferException e) {
      System.out.println(e.getStackTrace());
    }
  }
}

The Spring AMQP @Configuration bean:

@Configuration
@EnableAutoConfiguration
public class RabbitConfiguration {

  @Value("${rabbitmq.hostname}")
  private String queueHost;

  @Value("${rabbitmq.port}")
  private int queuePort;

  @Value("${rabbitmq.queues.host-queue-name}")
  public String hostQueueName;

  @Bean
  public CachingConnectionFactory connectionFactory() {
    return new CachingConnectionFactory(queueHost, queuePort);
  }

  @Bean
  public Queue hostNotificationQueue() {
    return new Queue(hostQueueName);
  }
}

Go is using google.golang.org/protobuf v1.27.1 in Go modules.

Java is using com.google.protobuf 3.19.4 as a Maven dependency. Along with protobuf-maven-plugin 0.6.1 to do the compilation with protoc.

This is a really confusing issue, would be great to get some insight.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

天涯沦落人 2025-01-17 23:41:26

在多次检查Java生成的protobuf类是否正确后,我意识到我没有验证Go生成的protobuf。

事实证明,我用来调用 protoc-gen-go 的脚本已经悄无声息地失败了,并且 Go 服务使用的是过时的消息版本,以及带有键 1 的附加字段。

After checking the that the Java generated protobuf class was correct multiple I times, I realised that I had not verified the Go generated protobuf.

Turns out the script I was using to call protoc-gen-go had failed silently and the Go service was using an outdated version of the message with an additional field with the key 1.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文