结构化流媒体UDF - 基于检查数据框中是否存在列的逻辑

发布于 2025-01-09 21:46:54 字数 2271 浏览 3 评论 0原文

我正在使用 StructuredStreaming,并尝试使用 UDF 来解析每一行。 要求如下:

  • 我们可以获取类型为“主要”或“关键”的特定 KPI 的警报
  • ,如果我们收到“主要”类型的警报,例如 _major,并且我们还有一个关键警报 _ritic,我们需要忽略 _major 警报,只考虑 _ritic 警报

有大约 25 个警报存储在数组(AlarmKeys.alarm_all)中,

下面是我的(草稿)代码 使用。

@udf(returnType=StringType())
def convertStructToStr(APP_CAUSE, tenantName, window,<one>,<two>__major,<three>__major, <four>__critical, five__major, <six>__critical):

    res = "{window: "+ str(window) + "type: 10m, applianceName: "+ str(APP_CAUSE)+","
    first = True
    for curr_alarm in AlarmKeys.alarms_all:
        alsplit = curr_alarm.split('__')
        if len(alsplit) == 2:
            # Only account for critical row if both are there
            if alsplit[1] == 'major':
                critical_alarm = alsplit[0] + "__critical"
                if int(col(critical_alarm)) > 0:
                    continue
        if int(col(curr_alarm)) > 0:
            if first:
                mystring = "{} {}({})".format(mystring, AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
                first = False
            else:
                mystring = "{}, {}({})".format(mystring, AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
        res+="insight: "+mystring +"}"


# structured streaming using udf, this is printing data on console
# eventually, i'll put data into Kafka instead
df.select(convertStructToStr(*df.columns)) \
.write \
    .format("console") \
    .option("numRows",100)\
    .option("checkpointLocation", "/Users/karanalang/PycharmProjects/Kafka/checkpoint") \
    .option("outputMode", "complete")\
    .save("output")

问题是 - 这可以使用 UDF 来完成吗? 由于我将列值传递给 UDF,因此我无法检查数据框中是否存在“关键”类型的特定 KPI?

解决此类问题的最佳方法是什么?

请注意:原始代码使用 (DStreams+)Pandas 和 KafkaProducer .. 它循环遍历数据帧的所有行并创建一个 json 字符串。然后使用 KafkaProducer 将数据推送到 Kafka。 如果可能的话,我正在尝试将其转换为使用 StructuredStreaming(Apache Spark v3.1.2)。

蒂亚!

澄清(基于@OneCricketeer 的评论:

  • <> 仅代表占位符,
  • KPI 的示例为“cpu_utilization__ritic”和“cpu_utilization__major”。 如果两者都有值,我只想报告严重警报,即“cpu_utilization__ritic”, 所以在地图中,我会添加条目 ->见解=高CPU利用率(cpu_utilization__ritic), 其中 CPU 利用率高 ->警报的描述,cpu_utilization__ritic ->实际关键绩效指标

I'm using StructuredStreaming, and am trying to use UDF to parse each row.
Here is the requirement:

  • we can get alerts of a particular KPI with type 'major' OR 'critical'
  • for a KPI, if we get alerts of type 'major' eg _major, and we have a critical alert as well _critical, we need to ignore the _major alert, and consider _critical alert only

There are ~25 alerts which are stored in the array (AlarmKeys.alarm_all)

Below is the (draft)code which i'm using.

@udf(returnType=StringType())
def convertStructToStr(APP_CAUSE, tenantName, window,<one>,<two>__major,<three>__major, <four>__critical, five__major, <six>__critical):

    res = "{window: "+ str(window) + "type: 10m, applianceName: "+ str(APP_CAUSE)+","
    first = True
    for curr_alarm in AlarmKeys.alarms_all:
        alsplit = curr_alarm.split('__')
        if len(alsplit) == 2:
            # Only account for critical row if both are there
            if alsplit[1] == 'major':
                critical_alarm = alsplit[0] + "__critical"
                if int(col(critical_alarm)) > 0:
                    continue
        if int(col(curr_alarm)) > 0:
            if first:
                mystring = "{} {}({})".format(mystring, AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
                first = False
            else:
                mystring = "{}, {}({})".format(mystring, AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
        res+="insight: "+mystring +"}"


# structured streaming using udf, this is printing data on console
# eventually, i'll put data into Kafka instead
df.select(convertStructToStr(*df.columns)) \
.write \
    .format("console") \
    .option("numRows",100)\
    .option("checkpointLocation", "/Users/karanalang/PycharmProjects/Kafka/checkpoint") \
    .option("outputMode", "complete")\
    .save("output")

Question is - can this be done using UDF ?
Since i'm passing column values to the UDF, i have no way to check if a particular KPI of type 'critical' is available in the dataframe ?

What is the best way to solve this kind of a problem ?

Pls note : the original code uses (DStreams+)Pandas and KafkaProducer .. which loops over all the rows of the dataframe and creates a json string.Then KafkaProducer is used to push data into Kafka.
I'm trying to convert this to use StructuredStreaming(Apache Spark v3.1.2), if possible.

tia!

Clarification(based on comment from @OneCricketeer :

  • <> just represents a placeholder,
  • An example of the KPI would be 'cpu_utilization__critical' and 'cpu_utilization__major'.
    if both have values, i want to report only the critical alert i.e. 'cpu_utilization__critical',
    so in the map, i would add entry -> insights="High CPU utilization(cpu_utilization__critical),
    where High CPU utilization -> description of the alert, cpu_utilization__critical -> actual KPI

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文