Spring AMQP 中协议缓冲区反序列化/编组不正确
我试图找出为什么我的协议缓冲区有效负载在从 Java 应用程序中的 Spring AMQP 侦听器接收时被错误地反序列化。 协议缓冲区写入 Go 服务并发送到 Rabbit MQ 实例,然后由 Java Spring 应用程序拾取该实例并反序列化为对象实例。
由于某种原因,收到编组对象时,其字段 2
的值为 1
,而字段 1
为 null
在生成的对象中。 此外,对象上带有键 5
的重复 network_interfaces
字段为 null
。
有趣的是,如果我在从队列中获取消息时使用调试器,则缺失的字段是可见的,并且包含 unknownFields
下实例化的 protobuf 对象的正确数据,因此它看起来像是原始解析器的情况,而不是工作正常。 调试器显示字段键已关闭,即字段1
具有键2
等等。
到目前为止我已经:
- 验证了原型文件在两个服务中是相同的,Spring 应用程序通过将另一个存储库中的 proto 文件作为 git 子模块导入
- 来使用它尝试使用
Spring.messaging.amqp
MessageConverter 显式反序列化 proto基于消息头但没有成功 - 发送消息时尝试了不同的消息内容类型和编码头,这些似乎没有什么区别,因为我只使用消息正文。
- 已验证,如果 Go 应用程序侦听队列,它可以反序列化并解组正确发送的消息,因此这必须是 Spring 配置问题或类似问题。
有问题的 Proto3 消息定义:
message HostStateMessage {
message NetworkInterface {
string name = 1;
string mac_address = 2;
string ipv4 = 3;
string ipv6 = 4;
}
string hostname = 1;
string description = 2;
HostType host_type = 3;
repeated NetworkInterface network_interfaces = 4;
}
将协议缓冲区作为消息写入 RabbitMQ 的 Go 函数:
func PublishHostMessage(config *AMQPQueueConfiguration, conn *amqp.Connection, message *protos.HostStateMessage) error {
channel, err := conn.Channel()
if err != nil {
fmt.Errorf("Creating channel failed: %s\n", err)
return err
}
queue, err := channel.QueueDeclare(config.HostQueueName, true, false, false, false, nil)
if err != nil {
fmt.Errorf("Failed to declare queue: %s\n", err)
return err
}
serialMsg, err := proto.Marshal(message)
if err != nil {
fmt.Errorf("Failed to serialize proto message: %s\n", err)
return err
}
headers := amqp.Table{}
headers["messageType"] = "HostStateMessage"
fmt.Println(string(serialMsg))
err = channel.Publish("", queue.Name, false, false, amqp.Publishing{Headers: headers, ContentType: "application/x-protobuf", Body: serialMsg})
if err != nil {
fmt.Errorf("Failed to send message: %s\n", err)
return err
}
fmt.Printf("Sent host message at: %s\n", time.Now())
return nil
}
MessageReceiver.java 类:
@Service
public class MessageReceiver {
@Autowired RabbitConfiguration rabbitConfiguration;
@Autowired HostDao hostDao;
@RabbitListener(queues = "#{rabbitConfiguration.hostQueueName}")
public void consumeHostNotification(Message in) {
try {
HostStateMessage message = HostStateMessage.parseFrom(in.getBody());
hostDao.addOrUpdateHostStateFromMessage(message);
} catch (InvalidProtocolBufferException e) {
System.out.println(e.getStackTrace());
}
}
}
Spring AMQP @Configuration bean:
@Configuration
@EnableAutoConfiguration
public class RabbitConfiguration {
@Value("${rabbitmq.hostname}")
private String queueHost;
@Value("${rabbitmq.port}")
private int queuePort;
@Value("${rabbitmq.queues.host-queue-name}")
public String hostQueueName;
@Bean
public CachingConnectionFactory connectionFactory() {
return new CachingConnectionFactory(queueHost, queuePort);
}
@Bean
public Queue hostNotificationQueue() {
return new Queue(hostQueueName);
}
}
Go 正在使用 google.golang Go 模块中的 .org/protobuf v1.27.1
。
Java 使用 com.google.protobuf 3.19.4
作为 Maven 依赖项。与 protobuf-maven-plugin 0.6.1 一起使用 protoc 进行编译。
这是一个非常令人困惑的问题,如果能得到一些见解就太好了。
I am trying to figure out why my protocol buffer payload is being incorrectly de-serialized when received from a Spring AMQP listener in my Java app.
The protocol buffer is written in a Go service and sent to a Rabbit MQ instance which is then picked up by the Java Spring app and de-serialized into an object instance.
For some reason when received the marshalled object has field 2
with the value of field 1
while field 1
is null
in the resulting object.
Additionally the repeated network_interfaces
field with key 5
is null
on the object.
Interestingly if I use the debugger when picking up a message from the queue, the missing fields are visible and contain the correct data on the instantiated protobuf object under unknownFields
so it looks like a case of the proto parser not working correctly.
The debugger shows that the field keys are off, i.e. field 1
has the key 2
and so on.
So far I have:
- Verified the proto files are the same in both services, the Spring app is using the proto file from the other repo by importing it as a git submodule
- Attempted to use a
Spring.messaging.amqp
MessageConverter to deserialize the proto explicitly based on message headers with no success - Tried different message content type and encoding headers when sending the message, these don't appear to make a difference as I am only using the message body.
- Verified that if the Go app listens to the queue that it can deserialize and unmarshal a message it sent correctly, so this has to be a Spring config issue or similar.
The Proto3 message definition in question:
message HostStateMessage {
message NetworkInterface {
string name = 1;
string mac_address = 2;
string ipv4 = 3;
string ipv6 = 4;
}
string hostname = 1;
string description = 2;
HostType host_type = 3;
repeated NetworkInterface network_interfaces = 4;
}
The Go function which writes the protocol buffer to RabbitMQ as a message:
func PublishHostMessage(config *AMQPQueueConfiguration, conn *amqp.Connection, message *protos.HostStateMessage) error {
channel, err := conn.Channel()
if err != nil {
fmt.Errorf("Creating channel failed: %s\n", err)
return err
}
queue, err := channel.QueueDeclare(config.HostQueueName, true, false, false, false, nil)
if err != nil {
fmt.Errorf("Failed to declare queue: %s\n", err)
return err
}
serialMsg, err := proto.Marshal(message)
if err != nil {
fmt.Errorf("Failed to serialize proto message: %s\n", err)
return err
}
headers := amqp.Table{}
headers["messageType"] = "HostStateMessage"
fmt.Println(string(serialMsg))
err = channel.Publish("", queue.Name, false, false, amqp.Publishing{Headers: headers, ContentType: "application/x-protobuf", Body: serialMsg})
if err != nil {
fmt.Errorf("Failed to send message: %s\n", err)
return err
}
fmt.Printf("Sent host message at: %s\n", time.Now())
return nil
}
The MessageReceiver.java Class:
@Service
public class MessageReceiver {
@Autowired RabbitConfiguration rabbitConfiguration;
@Autowired HostDao hostDao;
@RabbitListener(queues = "#{rabbitConfiguration.hostQueueName}")
public void consumeHostNotification(Message in) {
try {
HostStateMessage message = HostStateMessage.parseFrom(in.getBody());
hostDao.addOrUpdateHostStateFromMessage(message);
} catch (InvalidProtocolBufferException e) {
System.out.println(e.getStackTrace());
}
}
}
The Spring AMQP @Configuration bean:
@Configuration
@EnableAutoConfiguration
public class RabbitConfiguration {
@Value("${rabbitmq.hostname}")
private String queueHost;
@Value("${rabbitmq.port}")
private int queuePort;
@Value("${rabbitmq.queues.host-queue-name}")
public String hostQueueName;
@Bean
public CachingConnectionFactory connectionFactory() {
return new CachingConnectionFactory(queueHost, queuePort);
}
@Bean
public Queue hostNotificationQueue() {
return new Queue(hostQueueName);
}
}
Go is using google.golang.org/protobuf v1.27.1
in Go modules.
Java is using com.google.protobuf 3.19.4
as a Maven dependency. Along with protobuf-maven-plugin 0.6.1
to do the compilation with protoc.
This is a really confusing issue, would be great to get some insight.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
在多次检查Java生成的protobuf类是否正确后,我意识到我没有验证Go生成的protobuf。
事实证明,我用来调用 protoc-gen-go 的脚本已经悄无声息地失败了,并且 Go 服务使用的是过时的消息版本,以及带有键
1
的附加字段。After checking the that the Java generated protobuf class was correct multiple I times, I realised that I had not verified the Go generated protobuf.
Turns out the script I was using to call protoc-gen-go had failed silently and the Go service was using an outdated version of the message with an additional field with the key
1
.