pyspark-treaming protobuf消息给出错误:attributeError:' bytearray'对象没有属性'
我正在使用py spark流读来自kafka主题的Protobuf消息。
使用msg.thichoneof
给出错误 attributeError:'bytearray'对象没有属性'whonof'。
架构protofile是:
message InstrumentStatusUpdate {
....
..
}
...
..
message MarketDataEvent {
oneof event {
InstrumentStatusUpdate instrumentStatusUpdate = 1;
Trade trade = 2;
}
};
我正在使用的py-spark代码:
def parse_options_monitor_msg(msg:schema_pb2.MarketDataEvent):
eventStr = msg.WhichOneof("event")
if eventStr == "trade":
trade_list = []
trade_msg = msg.trade
symbol = trade_msg.symbol
ticker = symbol.split('_')[0]
trade_list = [symbol,ticker]
return str(trade_list)
parse_options_monitor = udf(lambda x: parse_options_monitor_msg(x), StringType())
df = spark.readStream \
.format("kafka") \
.options(**kafka_conf) \
.load()
data = df.selectExpr("offset", "value") \
.withColumn("event", parse_options_monitor(col("value")))
df2 = data.select(col("offset"),col("event"))
df2.writeStream \
.format("console") \
.outputMode("append") \
.option("truncate", False) \
.start() \
.awaitTermination()
如何摆脱此错误?
I am reading protobuf messages from kafka topic using py-spark streaming.
While using the msg.WhichOneOf
gives error AttributeError: 'bytearray' object has no attribute 'WhichOneof'.
The schema protofile is:
message InstrumentStatusUpdate {
....
..
}
...
..
message MarketDataEvent {
oneof event {
InstrumentStatusUpdate instrumentStatusUpdate = 1;
Trade trade = 2;
}
};
Py-spark code I'm using:
def parse_options_monitor_msg(msg:schema_pb2.MarketDataEvent):
eventStr = msg.WhichOneof("event")
if eventStr == "trade":
trade_list = []
trade_msg = msg.trade
symbol = trade_msg.symbol
ticker = symbol.split('_')[0]
trade_list = [symbol,ticker]
return str(trade_list)
parse_options_monitor = udf(lambda x: parse_options_monitor_msg(x), StringType())
df = spark.readStream \
.format("kafka") \
.options(**kafka_conf) \
.load()
data = df.selectExpr("offset", "value") \
.withColumn("event", parse_options_monitor(col("value")))
df2 = data.select(col("offset"),col("event"))
df2.writeStream \
.format("console") \
.outputMode("append") \
.option("truncate", False) \
.start() \
.awaitTermination()
How to get rid of this error ?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我添加了
解析()
方法。I added the
ParseFromString()
method.