ProToBuf:解析方法给出了错误“ rise Message_mod.decodeerror('field Number 0是非法的。”)"使用一个字段的消息

发布于 2025-02-12 00:47:15 字数 5231 浏览 0 评论 0原文

我有一个具有 protobuf 格式消息的Kafka主题:

message CreditTransaction {
    string date = 1;
    float amount = 2;
}

message DebitTransaction {
    string date = 1;
    float amount = 2;
}
...
.. # other message definitions

message TransactionEvent {
    oneof event {
        CreditTransaction credit = 1;
        DebitTransaction debit = 2;
        Trade trade = 3;
        ....
        ..# other fields
    }
};

当我尝试使用parsefromstring方法来解析它时,使用pyspark-streaminging,它给我带来了错误:

 File "./google.zip/google/protobuf/message.py", line 202, in ParseFromString
    return self.MergeFromString(serialized)
  File "./google.zip/google/protobuf/internal/python_message.py", line 1128, in MergeFromString
    if self._InternalParse(serialized, 0, length) != length:
  File "./google.zip/google/protobuf/internal/python_message.py", line 1178, in InternalParse
    raise message_mod.DecodeError('Field number 0 is illegal.')
google.protobuf.message.DecodeError: Field number 0 is illegal.

是否因为消息 transactionevent 只有一个字段,而Oneof类型?
我尝试添加虚拟 int64 id 字段

message TransactionEvent {
    int64 id = 1;
    oneof event {
        CreditTransaction credit = 2;
        DebitTransaction debit = 3;
        Trade trade = 4;
        ....
        ..# other fields
    }
};

,但仍然存在同样的错误。
我正在使用的代码:

def parse_protobuf_from_bytes(msg_bytes):
    msg = schema_pb2.MarketDataEvent()
    msg.ParseFromString(msg_bytes)
    eventStr = msg.WhichOneof("event")
    if eventStr=="credit":
        # some code
        
    elif eventStr=="debit":
        # some code
        
    return str(concatenatedFieldsValue)

parse_protobuf = udf(lambda x: parse_protobuf_from_bytes(x), StringType())

kafka_conf = {
    "kafka.bootstrap.servers": "kafka.broker.com:9092",
    "checkpointLocation": "/user/aiman/checkpoint/kafka_local/transactions",
    "subscribe": "TRANSACTIONS",
    "startingOffsets": "earliest",
    "enable.auto.commit": False,
    "value.deserializer": "ByteArrayDeserializer",
    "group.id": "my-group"
}


df = spark.readStream \
            .format("kafka") \
            .options(**kafka_conf) \
            .load()

data = df.selectExpr("offset","CAST(key AS STRING)", "value") \
         .withColumn("event", parse_protobuf(col("value")))


df2 = data.select(col("offset"),col("event"))

如果我只是在不解析的情况下打印字节,我会得到这个:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|offset |event                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|7777777|bytearray(b'\x00\x00\x00\x00\xc2\x02\x0e\x1a]\n\x11RIOT_230120P35.00\x10\x80\xa6\xae\x82\xd9\xed\xf1\xfe\x16\x18\xcd\xd9\xd9\x82\xd9\xed\xf1\xfe\x16 \xe2\xf7\xd9\x82\xd9\xed\xf1\xfe\x16(\x95\xa2\xed\xff\xd9\xed\xf1\xfe\x160\x8c\xaa\xed\xff\xd9\xed\xf1\xfe\x168\x80\xd1\xb6\xc1\x0b@\xc0\x8d\xa3\xba\x0bH\x19P\x04Z\x02Q_b\x02A_')                                                                                                                                                                                      |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

I have a kafka topic that has protobuf message of format:

message CreditTransaction {
    string date = 1;
    float amount = 2;
}

message DebitTransaction {
    string date = 1;
    float amount = 2;
}
...
.. # other message definitions

message TransactionEvent {
    oneof event {
        CreditTransaction credit = 1;
        DebitTransaction debit = 2;
        Trade trade = 3;
        ....
        ..# other fields
    }
};

Using pyspark-streaming, when I am trying to use ParseFromString method to parse it, its giving me error:

 File "./google.zip/google/protobuf/message.py", line 202, in ParseFromString
    return self.MergeFromString(serialized)
  File "./google.zip/google/protobuf/internal/python_message.py", line 1128, in MergeFromString
    if self._InternalParse(serialized, 0, length) != length:
  File "./google.zip/google/protobuf/internal/python_message.py", line 1178, in InternalParse
    raise message_mod.DecodeError('Field number 0 is illegal.')
google.protobuf.message.DecodeError: Field number 0 is illegal.

Is it because the message TransactionEvent has only a single field and that too oneof type ?
I tried to add a dummy int64 id field also

message TransactionEvent {
    int64 id = 1;
    oneof event {
        CreditTransaction credit = 2;
        DebitTransaction debit = 3;
        Trade trade = 4;
        ....
        ..# other fields
    }
};

but still the same error.
Code I am using:

def parse_protobuf_from_bytes(msg_bytes):
    msg = schema_pb2.MarketDataEvent()
    msg.ParseFromString(msg_bytes)
    eventStr = msg.WhichOneof("event")
    if eventStr=="credit":
        # some code
        
    elif eventStr=="debit":
        # some code
        
    return str(concatenatedFieldsValue)

parse_protobuf = udf(lambda x: parse_protobuf_from_bytes(x), StringType())

kafka_conf = {
    "kafka.bootstrap.servers": "kafka.broker.com:9092",
    "checkpointLocation": "/user/aiman/checkpoint/kafka_local/transactions",
    "subscribe": "TRANSACTIONS",
    "startingOffsets": "earliest",
    "enable.auto.commit": False,
    "value.deserializer": "ByteArrayDeserializer",
    "group.id": "my-group"
}


df = spark.readStream \
            .format("kafka") \
            .options(**kafka_conf) \
            .load()

data = df.selectExpr("offset","CAST(key AS STRING)", "value") \
         .withColumn("event", parse_protobuf(col("value")))


df2 = data.select(col("offset"),col("event"))

If I am just printing the bytes without parsing, I am getting this:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|offset |event                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|7777777|bytearray(b'\x00\x00\x00\x00\xc2\x02\x0e\x1a]\n\x11RIOT_230120P35.00\x10\x80\xa6\xae\x82\xd9\xed\xf1\xfe\x16\x18\xcd\xd9\xd9\x82\xd9\xed\xf1\xfe\x16 \xe2\xf7\xd9\x82\xd9\xed\xf1\xfe\x16(\x95\xa2\xed\xff\xd9\xed\xf1\xfe\x160\x8c\xaa\xed\xff\xd9\xed\xf1\xfe\x168\x80\xd1\xb6\xc1\x0b@\xc0\x8d\xa3\xba\x0bH\x19P\x04Z\x02Q_b\x02A_')                                                                                                                                                                                      |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

动次打次papapa 2025-02-19 00:47:15

您要解码的原始数据是:

b'\x00\x00\x00\x00\xc2\x02\x0e\x1a]\n\x11RIOT_230120P35.00\x10\x80\xa6\xae\x82\xd9\xed\xf1\xfe\x16\x18 \ xcd \ xd9 \ xd9 \ x82 \ xd9 \ xd9 \ xed \ xf1 \ xfe \ xfe \ x16 \ xe2 \ xf7 \ xd9 \ xd9 \ x82 \ xd9 \ xd9 \ xd9 \ xed \ xed \ xf1 xf1 \ xfe \ x160 \ x8c \ xaa \ xed \ xff \ xd9 \ xed \ xf1 \ xf1 \ xfe \ x168 \ x168 \ x80 _b \ x02a _'

开头,因为该字段数0在开始时似乎有一些额外的数据。

有效的Protobuf消息从不以0x00 doc/编码“ rel =” nofollow noreferrer”> protobuf编码规范,我们可以尝试理解这一点。

从字符串riot_230120p35.00开始,它是由长度正确的前缀0x11(17个字符)。

剥离了第一个7字节并转换为六角(1a 5d 0a 11 52 49 4f 54 5f 32 33 30 30 31 32 30 31 33 33 33 33 35 2E 30 30 30 10 80 A6 A6 A6 AE 82 D9 ED F1 F1 F1 F1 F1 F1 F1 F1 F1 F1 18 CD D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 82 D9 ED F1 FE 16 20 E2 F7 D9 82 D9 ED F1 FE 16 28 95 A2 ED FF D9 ED F1 F1 FE 1 FE 16 30 8C AA ED FF D9 ED F1 F1 F1 FE 1 FE 16 38 80 D1 B6 C1 0B 40 C0 C0 C0 8D A3 BA 0B 48 19 50 50 50B 48 19 50 50 04 5A 02 51 5F 62 02 41 5F),该消息被

由于某种原因,该消息似乎在开始时有7个额外的字节。这些字节不符合Protobuf格式,如果没有通信其他端点的开发人员的某些信息,就无法确定其含义。

The raw data you are trying to decode is:

b'\x00\x00\x00\x00\xc2\x02\x0e\x1a]\n\x11RIOT_230120P35.00\x10\x80\xa6\xae\x82\xd9\xed\xf1\xfe\x16\x18\xcd\xd9\xd9\x82\xd9\xed\xf1\xfe\x16 \xe2\xf7\xd9\x82\xd9\xed\xf1\xfe\x16(\x95\xa2\xed\xff\xd9\xed\xf1\xfe\x160\x8c\xaa\xed\xff\xd9\xed\xf1\xfe\x168\x80\xd1\xb6\xc1\x0b@\xc0\x8d\xa3\xba\x0bH\x19P\x04Z\x02Q_b\x02A_'

A valid protobuf message never starts with 0x00, because the field number 0 is reserved. The message appears to have some extra data in the beginning.

Comparing with the protobuf encoding specification, we can try to make sense of this.

Starting from the string RIOT_230120P35.00, it is correctly prefixed by length 0x11 (17 characters). The previous byte is 0x0A, which is tag for field 1 with type string, such as in CreditTransaction message. Reading the message backwards from there, everything looks reasonable up to 0x1A byte.

After stripping the first 7 bytes and converting to hex (1a 5d 0a 11 52 49 4f 54 5f 32 33 30 31 32 30 50 33 35 2e 30 30 10 80 a6 ae 82 d9 ed f1 fe 16 18 cd d9 d9 82 d9 ed f1 fe 16 20 e2 f7 d9 82 d9 ed f1 fe 16 28 95 a2 ed ff d9 ed f1 fe 16 30 8c aa ed ff d9 ed f1 fe 16 38 80 d1 b6 c1 0b 40 c0 8d a3 ba 0b 48 19 50 04 5a 02 51 5f 62 02 41 5f), the message is accepted by online protobuf decoder.

It seems the message has 7 extra bytes in the beginning for some reason. These bytes do not conform to the protobuf format and their meaning cannot be determined without some information from the developer of the other endpoint of the communication.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文