KSQL 查询检查值完整性
组成的流
我有一个由以下示例值、 correlation_id 和 event_type
示例
aud-103触发器
aud-104触发器
aud-109 缓解措施
aud-103 缓解
如果检测到具有相同关联 ID 的事件且最新的 event_type 为缓解且之前已触发,则将状态设置为已缓解,否则未缓解;
换句话说,只有当事件具有触发器和缓解作为事件类型时,事件才被视为缓解\
我需要构建一个表来按列的最新值进行聚合
我设法使用下面的(非常)肮脏的查询来实现此目的\
CREATE TABLE SIEM_PARSE_EVENT_TBL WITH (KAFKA_TOPIC='SIEM_PARSE_EVENT_TBL', PARTITIONS=1, REPLICAS=1) AS SELECT
SIEM_PARSE_EVENT.CORRELATION_ID CORRELATION_ID,
CASE
WHEN
(LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger')
OR
(LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation')
THEN 'mitigated'
ELSE 'unmitigated'
END AS MITIGATED_STATUS,
COUNT(*) TOTAL
FROM SIEM_PARSE_EVENT SIEM_PARSE_EVENT
GROUP BY SIEM_PARSE_EVENT.CORRELATION_ID
EMIT CHANGES;
是否有任何如何做到这一点清洁? \
更新
我使用下面的查询进行了管理,
SELECT CORRELATIONID , collect_list( EVENTMITIGATIONTYPE ) from SIEM_PARSE_EVENT group by correlationid
HAVING ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'trigger' )
AND ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'mitigation' )
EMIT CHANGES;
I have a Stream consisting of below sample value,
correlation_id and event_type
Example
aud-103 trigger
aud-104 trigger
aud-109 mitigation
aud-103 mitigation
If an event with same correlation_id detected AND latest event_type is mitigation AND already has trigger previously, set status as mitigated else unmitigated;
In other word, An event is considered mitigated only if it has trigger and mitigation as event type\
I need to build a table to aggregate by latest value of a column
I managed to achieve this using below (very) dirty query \
CREATE TABLE SIEM_PARSE_EVENT_TBL WITH (KAFKA_TOPIC='SIEM_PARSE_EVENT_TBL', PARTITIONS=1, REPLICAS=1) AS SELECT
SIEM_PARSE_EVENT.CORRELATION_ID CORRELATION_ID,
CASE
WHEN
(LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger')
OR
(LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation')
THEN 'mitigated'
ELSE 'unmitigated'
END AS MITIGATED_STATUS,
COUNT(*) TOTAL
FROM SIEM_PARSE_EVENT SIEM_PARSE_EVENT
GROUP BY SIEM_PARSE_EVENT.CORRELATION_ID
EMIT CHANGES;
Is there any way to do this cleaner? \
UPDATE
I managed using below query,
SELECT CORRELATIONID , collect_list( EVENTMITIGATIONTYPE ) from SIEM_PARSE_EVENT group by correlationid
HAVING ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'trigger' )
AND ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'mitigation' )
EMIT CHANGES;
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我有点喜欢你的解决方案;它是可读的。
作为一些替代方案,您可以使用一系列函数,例如
collect_list
、array_intersect
和array_length
来收集event_type
>s,并检查是否同时存在缓解措施和触发因素...如果您正在管理自己的 ksqlDB,编写自定义 UDF 或 UDAF 来提供帮助将是另一种选择。
I kinda like your solution; it is readable.
As some alternatives, you could probably use a series of functions like
collect_list
,array_intersect
, andarray_length
to gather theevent_type
s, and check if bothmitigation
andtrigger
are present...If you are managing your own ksqlDB, writing custom UDFs or a UDAF to help would be another option.