pyspark-treaming protobuf消息给出错误:attributeError:' bytearray'对象没有属性'

发布于 2025-02-08 07:20:29 字数 1306 浏览 1 评论 0原文

我正在使用py spark流读来自kafka主题的Protobuf消息。
使用msg.thichoneof给出错误 attributeError:'bytearray'对象没有属性'whonof'
架构protofile是:

message InstrumentStatusUpdate {
    ....
    ..
}
...
..
message MarketDataEvent {
    oneof event {
        InstrumentStatusUpdate instrumentStatusUpdate = 1;
        Trade trade = 2;
    }
};

我正在使用的py-spark代码:

def parse_options_monitor_msg(msg:schema_pb2.MarketDataEvent):
    eventStr = msg.WhichOneof("event")
    if eventStr == "trade":
        trade_list = []
        trade_msg = msg.trade
        symbol = trade_msg.symbol
        ticker = symbol.split('_')[0]
        trade_list = [symbol,ticker]
        return str(trade_list)

parse_options_monitor = udf(lambda x: parse_options_monitor_msg(x), StringType())

df = spark.readStream \
            .format("kafka") \
            .options(**kafka_conf) \
            .load()
data = df.selectExpr("offset", "value") \
         .withColumn("event", parse_options_monitor(col("value")))

df2 = data.select(col("offset"),col("event"))

df2.writeStream \
            .format("console") \
            .outputMode("append") \
            .option("truncate", False) \
            .start() \
            .awaitTermination()

如何摆脱此错误?

I am reading protobuf messages from kafka topic using py-spark streaming.
While using the msg.WhichOneOf gives error AttributeError: 'bytearray' object has no attribute 'WhichOneof'.
The schema protofile is:

message InstrumentStatusUpdate {
    ....
    ..
}
...
..
message MarketDataEvent {
    oneof event {
        InstrumentStatusUpdate instrumentStatusUpdate = 1;
        Trade trade = 2;
    }
};

Py-spark code I'm using:

def parse_options_monitor_msg(msg:schema_pb2.MarketDataEvent):
    eventStr = msg.WhichOneof("event")
    if eventStr == "trade":
        trade_list = []
        trade_msg = msg.trade
        symbol = trade_msg.symbol
        ticker = symbol.split('_')[0]
        trade_list = [symbol,ticker]
        return str(trade_list)

parse_options_monitor = udf(lambda x: parse_options_monitor_msg(x), StringType())

df = spark.readStream \
            .format("kafka") \
            .options(**kafka_conf) \
            .load()
data = df.selectExpr("offset", "value") \
         .withColumn("event", parse_options_monitor(col("value")))

df2 = data.select(col("offset"),col("event"))

df2.writeStream \
            .format("console") \
            .outputMode("append") \
            .option("truncate", False) \
            .start() \
            .awaitTermination()

How to get rid of this error ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

零度° 2025-02-15 07:20:29

我添加了解析()方法。

def parse_options_monitor_msg(msg_bytes:schema_pb2.MarketDataEvent):
    msg = schema_pb2.MarketDataEvent()
    msg.ParseFromString(msg_bytes)
    eventStr = msg.WhichOneof("event")
    if eventStr == "trade":
        trade_list = []
        trade_msg = msg.trade
        symbol = trade_msg.symbol
        ticker = symbol.split('_')[0]
        trade_list = [symbol,ticker]
        return str(trade_list)

I added the ParseFromString() method.

def parse_options_monitor_msg(msg_bytes:schema_pb2.MarketDataEvent):
    msg = schema_pb2.MarketDataEvent()
    msg.ParseFromString(msg_bytes)
    eventStr = msg.WhichOneof("event")
    if eventStr == "trade":
        trade_list = []
        trade_msg = msg.trade
        symbol = trade_msg.symbol
        ticker = symbol.split('_')[0]
        trade_list = [symbol,ticker]
        return str(trade_list)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文