ProToBuf:解析方法给出了错误“ rise Message_mod.decodeerror('field Number 0是非法的。”)"使用一个字段的消息
我有一个具有 protobuf 格式消息的Kafka主题:
message CreditTransaction {
string date = 1;
float amount = 2;
}
message DebitTransaction {
string date = 1;
float amount = 2;
}
...
.. # other message definitions
message TransactionEvent {
oneof event {
CreditTransaction credit = 1;
DebitTransaction debit = 2;
Trade trade = 3;
....
..# other fields
}
};
当我尝试使用parsefromstring
方法来解析它时,使用pyspark-streaminging,它给我带来了错误:
File "./google.zip/google/protobuf/message.py", line 202, in ParseFromString
return self.MergeFromString(serialized)
File "./google.zip/google/protobuf/internal/python_message.py", line 1128, in MergeFromString
if self._InternalParse(serialized, 0, length) != length:
File "./google.zip/google/protobuf/internal/python_message.py", line 1178, in InternalParse
raise message_mod.DecodeError('Field number 0 is illegal.')
google.protobuf.message.DecodeError: Field number 0 is illegal.
是否因为消息 transactionevent 只有一个字段,而Oneof
类型?
我尝试添加虚拟 int64 id 字段
message TransactionEvent {
int64 id = 1;
oneof event {
CreditTransaction credit = 2;
DebitTransaction debit = 3;
Trade trade = 4;
....
..# other fields
}
};
,但仍然存在同样的错误。
我正在使用的代码:
def parse_protobuf_from_bytes(msg_bytes):
msg = schema_pb2.MarketDataEvent()
msg.ParseFromString(msg_bytes)
eventStr = msg.WhichOneof("event")
if eventStr=="credit":
# some code
elif eventStr=="debit":
# some code
return str(concatenatedFieldsValue)
parse_protobuf = udf(lambda x: parse_protobuf_from_bytes(x), StringType())
kafka_conf = {
"kafka.bootstrap.servers": "kafka.broker.com:9092",
"checkpointLocation": "/user/aiman/checkpoint/kafka_local/transactions",
"subscribe": "TRANSACTIONS",
"startingOffsets": "earliest",
"enable.auto.commit": False,
"value.deserializer": "ByteArrayDeserializer",
"group.id": "my-group"
}
df = spark.readStream \
.format("kafka") \
.options(**kafka_conf) \
.load()
data = df.selectExpr("offset","CAST(key AS STRING)", "value") \
.withColumn("event", parse_protobuf(col("value")))
df2 = data.select(col("offset"),col("event"))
如果我只是在不解析的情况下打印字节,我会得到这个:
-------------------------------------------
Batch: 0
-------------------------------------------

|offset |event |

|7777777|bytearray(b'\x00\x00\x00\x00\xc2\x02\x0e\x1a]\n\x11RIOT_230120P35.00\x10\x80\xa6\xae\x82\xd9\xed\xf1\xfe\x16\x18\xcd\xd9\xd9\x82\xd9\xed\xf1\xfe\x16 \xe2\xf7\xd9\x82\xd9\xed\xf1\xfe\x16(\x95\xa2\xed\xff\xd9\xed\xf1\xfe\x160\x8c\xaa\xed\xff\xd9\xed\xf1\xfe\x168\x80\xd1\xb6\xc1\x0b@\xc0\x8d\xa3\xba\x0bH\x19P\x04Z\x02Q_b\x02A_') |

I have a kafka topic that has protobuf message of format:
message CreditTransaction {
string date = 1;
float amount = 2;
}
message DebitTransaction {
string date = 1;
float amount = 2;
}
...
.. # other message definitions
message TransactionEvent {
oneof event {
CreditTransaction credit = 1;
DebitTransaction debit = 2;
Trade trade = 3;
....
..# other fields
}
};
Using pyspark-streaming, when I am trying to use ParseFromString
method to parse it, its giving me error:
File "./google.zip/google/protobuf/message.py", line 202, in ParseFromString
return self.MergeFromString(serialized)
File "./google.zip/google/protobuf/internal/python_message.py", line 1128, in MergeFromString
if self._InternalParse(serialized, 0, length) != length:
File "./google.zip/google/protobuf/internal/python_message.py", line 1178, in InternalParse
raise message_mod.DecodeError('Field number 0 is illegal.')
google.protobuf.message.DecodeError: Field number 0 is illegal.
Is it because the message TransactionEvent has only a single field and that too oneof
type ?
I tried to add a dummy int64 id field also
message TransactionEvent {
int64 id = 1;
oneof event {
CreditTransaction credit = 2;
DebitTransaction debit = 3;
Trade trade = 4;
....
..# other fields
}
};
but still the same error.
Code I am using:
def parse_protobuf_from_bytes(msg_bytes):
msg = schema_pb2.MarketDataEvent()
msg.ParseFromString(msg_bytes)
eventStr = msg.WhichOneof("event")
if eventStr=="credit":
# some code
elif eventStr=="debit":
# some code
return str(concatenatedFieldsValue)
parse_protobuf = udf(lambda x: parse_protobuf_from_bytes(x), StringType())
kafka_conf = {
"kafka.bootstrap.servers": "kafka.broker.com:9092",
"checkpointLocation": "/user/aiman/checkpoint/kafka_local/transactions",
"subscribe": "TRANSACTIONS",
"startingOffsets": "earliest",
"enable.auto.commit": False,
"value.deserializer": "ByteArrayDeserializer",
"group.id": "my-group"
}
df = spark.readStream \
.format("kafka") \
.options(**kafka_conf) \
.load()
data = df.selectExpr("offset","CAST(key AS STRING)", "value") \
.withColumn("event", parse_protobuf(col("value")))
df2 = data.select(col("offset"),col("event"))
If I am just printing the bytes without parsing, I am getting this:
-------------------------------------------
Batch: 0
-------------------------------------------

|offset |event |

|7777777|bytearray(b'\x00\x00\x00\x00\xc2\x02\x0e\x1a]\n\x11RIOT_230120P35.00\x10\x80\xa6\xae\x82\xd9\xed\xf1\xfe\x16\x18\xcd\xd9\xd9\x82\xd9\xed\xf1\xfe\x16 \xe2\xf7\xd9\x82\xd9\xed\xf1\xfe\x16(\x95\xa2\xed\xff\xd9\xed\xf1\xfe\x160\x8c\xaa\xed\xff\xd9\xed\xf1\xfe\x168\x80\xd1\xb6\xc1\x0b@\xc0\x8d\xa3\xba\x0bH\x19P\x04Z\x02Q_b\x02A_') |

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您要解码的原始数据是:
开头,因为该字段数0在开始时似乎有一些额外的数据。
有效的Protobuf消息从不以0x00 doc/编码“ rel =” nofollow noreferrer”> protobuf编码规范,我们可以尝试理解这一点。
从字符串
riot_230120p35.00
开始,它是由长度正确的前缀0x11(17个字符)。剥离了第一个7字节并转换为六角(
1a 5d 0a 11 52 49 4f 54 5f 32 33 30 30 31 32 30 31 33 33 33 33 35 2E 30 30 30 10 80 A6 A6 A6 AE 82 D9 ED F1 F1 F1 F1 F1 F1 F1 F1 F1 F1 18 CD D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 D9 82 D9 ED F1 FE 16 20 E2 F7 D9 82 D9 ED F1 FE 16 28 95 A2 ED FF D9 ED F1 F1 FE 1 FE 16 30 8C AA ED FF D9 ED F1 F1 F1 FE 1 FE 16 38 80 D1 B6 C1 0B 40 C0 C0 C0 8D A3 BA 0B 48 19 50 50 50B 48 19 50 50 04 5A 02 51 5F 62 02 41 5F
),该消息被。由于某种原因,该消息似乎在开始时有7个额外的字节。这些字节不符合Protobuf格式,如果没有通信其他端点的开发人员的某些信息,就无法确定其含义。
The raw data you are trying to decode is:
A valid protobuf message never starts with 0x00, because the field number 0 is reserved. The message appears to have some extra data in the beginning.
Comparing with the protobuf encoding specification, we can try to make sense of this.
Starting from the string
RIOT_230120P35.00
, it is correctly prefixed by length 0x11 (17 characters). The previous byte is 0x0A, which is tag for field 1 with type string, such as inCreditTransaction
message. Reading the message backwards from there, everything looks reasonable up to 0x1A byte.After stripping the first 7 bytes and converting to hex (
1a 5d 0a 11 52 49 4f 54 5f 32 33 30 31 32 30 50 33 35 2e 30 30 10 80 a6 ae 82 d9 ed f1 fe 16 18 cd d9 d9 82 d9 ed f1 fe 16 20 e2 f7 d9 82 d9 ed f1 fe 16 28 95 a2 ed ff d9 ed f1 fe 16 30 8c aa ed ff d9 ed f1 fe 16 38 80 d1 b6 c1 0b 40 c0 8d a3 ba 0b 48 19 50 04 5a 02 51 5f 62 02 41 5f
), the message is accepted by online protobuf decoder.It seems the message has 7 extra bytes in the beginning for some reason. These bytes do not conform to the protobuf format and their meaning cannot be determined without some information from the developer of the other endpoint of the communication.