KSQL 查询检查值完整性

发布于 2025-01-11 09:17:28 字数 1291 浏览 0 评论 0原文

组成的流

我有一个由以下示例值、 correlation_id 和 event_type

示例
aud-103触发器
aud-104触发器
aud-109 缓解措施
aud-103 缓解

如果检测到具有相同关联 ID 的事件且最新的 event_type 为缓解且之前已触发,则将状态设置为已缓解,否则未缓解;
换句话说,只有当事件具有触发器和缓解作为事件类型时,事件才被视为缓解\

我需要构建一个表来按列的最新值进行聚合

我设法使用下面的(非常)肮脏的查询来实现此目的\

CREATE TABLE SIEM_PARSE_EVENT_TBL WITH (KAFKA_TOPIC='SIEM_PARSE_EVENT_TBL', PARTITIONS=1, REPLICAS=1) AS SELECT
  SIEM_PARSE_EVENT.CORRELATION_ID CORRELATION_ID,
  CASE
   WHEN 
   (LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger') 
   OR
   (LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation')
   THEN 'mitigated'
   ELSE 'unmitigated'
  END AS MITIGATED_STATUS,
  COUNT(*) TOTAL
FROM SIEM_PARSE_EVENT SIEM_PARSE_EVENT
GROUP BY SIEM_PARSE_EVENT.CORRELATION_ID
EMIT CHANGES;

是否有任何如何做到这一点清洁? \

更新
我使用下面的查询进行了管理,

SELECT  CORRELATIONID , collect_list( EVENTMITIGATIONTYPE ) from SIEM_PARSE_EVENT group by correlationid 
 HAVING ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'trigger' )
  AND ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'mitigation' )
EMIT CHANGES;

I have a Stream consisting of below sample value,

correlation_id and event_type

Example
aud-103 trigger
aud-104 trigger
aud-109 mitigation
aud-103 mitigation

If an event with same correlation_id detected AND latest event_type is mitigation AND already has trigger previously, set status as mitigated else unmitigated;
In other word, An event is considered mitigated only if it has trigger and mitigation as event type\

I need to build a table to aggregate by latest value of a column

I managed to achieve this using below (very) dirty query \

CREATE TABLE SIEM_PARSE_EVENT_TBL WITH (KAFKA_TOPIC='SIEM_PARSE_EVENT_TBL', PARTITIONS=1, REPLICAS=1) AS SELECT
  SIEM_PARSE_EVENT.CORRELATION_ID CORRELATION_ID,
  CASE
   WHEN 
   (LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger') 
   OR
   (LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation')
   THEN 'mitigated'
   ELSE 'unmitigated'
  END AS MITIGATED_STATUS,
  COUNT(*) TOTAL
FROM SIEM_PARSE_EVENT SIEM_PARSE_EVENT
GROUP BY SIEM_PARSE_EVENT.CORRELATION_ID
EMIT CHANGES;

Is there any way to do this cleaner? \

UPDATE
I managed using below query,

SELECT  CORRELATIONID , collect_list( EVENTMITIGATIONTYPE ) from SIEM_PARSE_EVENT group by correlationid 
 HAVING ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'trigger' )
  AND ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'mitigation' )
EMIT CHANGES;

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

素罗衫 2025-01-18 09:17:28

我有点喜欢你的解决方案;它是可读的。

作为一些替代方案,您可以使用一系列函数,例如 collect_listarray_intersectarray_length 来收集 event_type >s,并检查是否同时存在缓解措施和触发因素...

如果您正在管理自己的 ksqlDB,编写自定义 UDF 或 UDAF 来提供帮助将是另一种选择。

I kinda like your solution; it is readable.

As some alternatives, you could probably use a series of functions like collect_list, array_intersect, and array_length to gather the event_types, and check if both mitigation and trigger are present...

If you are managing your own ksqlDB, writing custom UDFs or a UDAF to help would be another option.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文