结构化流媒体UDF - 基于检查数据框中是否存在列的逻辑
我正在使用 StructuredStreaming,并尝试使用 UDF 来解析每一行。 要求如下:
- 我们可以获取类型为“主要”或“关键”的特定 KPI 的警报
- ,如果我们收到“主要”类型的警报,例如 _major,并且我们还有一个关键警报 _ritic,我们需要忽略 _major 警报,只考虑 _ritic 警报
有大约 25 个警报存储在数组(AlarmKeys.alarm_all)中,
下面是我的(草稿)代码 使用。
@udf(returnType=StringType())
def convertStructToStr(APP_CAUSE, tenantName, window,<one>,<two>__major,<three>__major, <four>__critical, five__major, <six>__critical):
res = "{window: "+ str(window) + "type: 10m, applianceName: "+ str(APP_CAUSE)+","
first = True
for curr_alarm in AlarmKeys.alarms_all:
alsplit = curr_alarm.split('__')
if len(alsplit) == 2:
# Only account for critical row if both are there
if alsplit[1] == 'major':
critical_alarm = alsplit[0] + "__critical"
if int(col(critical_alarm)) > 0:
continue
if int(col(curr_alarm)) > 0:
if first:
mystring = "{} {}({})".format(mystring, AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
first = False
else:
mystring = "{}, {}({})".format(mystring, AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
res+="insight: "+mystring +"}"
# structured streaming using udf, this is printing data on console
# eventually, i'll put data into Kafka instead
df.select(convertStructToStr(*df.columns)) \
.write \
.format("console") \
.option("numRows",100)\
.option("checkpointLocation", "/Users/karanalang/PycharmProjects/Kafka/checkpoint") \
.option("outputMode", "complete")\
.save("output")
问题是 - 这可以使用 UDF 来完成吗? 由于我将列值传递给 UDF,因此我无法检查数据框中是否存在“关键”类型的特定 KPI?
解决此类问题的最佳方法是什么?
请注意:原始代码使用 (DStreams+)Pandas 和 KafkaProducer .. 它循环遍历数据帧的所有行并创建一个 json 字符串。然后使用 KafkaProducer 将数据推送到 Kafka。 如果可能的话,我正在尝试将其转换为使用 StructuredStreaming(Apache Spark v3.1.2)。
蒂亚!
澄清(基于@OneCricketeer 的评论:
- <> 仅代表占位符,
- KPI 的示例为“cpu_utilization__ritic”和“cpu_utilization__major”。 如果两者都有值,我只想报告严重警报,即“cpu_utilization__ritic”, 所以在地图中,我会添加条目 ->见解=高CPU利用率(cpu_utilization__ritic), 其中 CPU 利用率高 ->警报的描述,cpu_utilization__ritic ->实际关键绩效指标
I'm using StructuredStreaming, and am trying to use UDF to parse each row.
Here is the requirement:
- we can get alerts of a particular KPI with type 'major' OR 'critical'
- for a KPI, if we get alerts of type 'major' eg _major, and we have a critical alert as well _critical, we need to ignore the _major alert, and consider _critical alert only
There are ~25 alerts which are stored in the array (AlarmKeys.alarm_all)
Below is the (draft)code which i'm using.
@udf(returnType=StringType())
def convertStructToStr(APP_CAUSE, tenantName, window,<one>,<two>__major,<three>__major, <four>__critical, five__major, <six>__critical):
res = "{window: "+ str(window) + "type: 10m, applianceName: "+ str(APP_CAUSE)+","
first = True
for curr_alarm in AlarmKeys.alarms_all:
alsplit = curr_alarm.split('__')
if len(alsplit) == 2:
# Only account for critical row if both are there
if alsplit[1] == 'major':
critical_alarm = alsplit[0] + "__critical"
if int(col(critical_alarm)) > 0:
continue
if int(col(curr_alarm)) > 0:
if first:
mystring = "{} {}({})".format(mystring, AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
first = False
else:
mystring = "{}, {}({})".format(mystring, AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
res+="insight: "+mystring +"}"
# structured streaming using udf, this is printing data on console
# eventually, i'll put data into Kafka instead
df.select(convertStructToStr(*df.columns)) \
.write \
.format("console") \
.option("numRows",100)\
.option("checkpointLocation", "/Users/karanalang/PycharmProjects/Kafka/checkpoint") \
.option("outputMode", "complete")\
.save("output")
Question is - can this be done using UDF ?
Since i'm passing column values to the UDF, i have no way to check if a particular KPI of type 'critical' is available in the dataframe ?
What is the best way to solve this kind of a problem ?
Pls note : the original code uses (DStreams+)Pandas and KafkaProducer .. which loops over all the rows of the dataframe and creates a json string.Then KafkaProducer is used to push data into Kafka.
I'm trying to convert this to use StructuredStreaming(Apache Spark v3.1.2), if possible.
tia!
Clarification(based on comment from @OneCricketeer :
- <> just represents a placeholder,
- An example of the KPI would be 'cpu_utilization__critical' and 'cpu_utilization__major'.
if both have values, i want to report only the critical alert i.e. 'cpu_utilization__critical',
so in the map, i would add entry -> insights="High CPU utilization(cpu_utilization__critical),
where High CPU utilization -> description of the alert, cpu_utilization__critical -> actual KPI
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论