Azure流分析当前的聚合

发布于 2025-01-28 23:11:52 字数 718 浏览 2 评论 0 原文

我在Azure Stream Analytics中很新,但是每次新活动都到达Azure Stream Analytics工作,我需要从一天开始就推动BI(Live Dashboard)滚动总数。我已经创建了下一个SQL查询来计算此问题

SELECT
    Factory_Id,
    COUNT(0) as events_count, 
    MAX(event_create_time) as last_event_time,
    SUM(event_value) as event_value_total
INTO
        [powerbi]
FROM
    [eventhub] TIMESTAMP BY event_create_time
WHERE DAY(event_create_time) = DAY(System.Timestamp) and MONTH(event_create_time) = MONTH(System.Timestamp) and YEAR(event_create_time) = YEAR(System.Timestamp)
    GROUP BY Factory_Id, SlidingWindow(day,1)

,但这并没有给我带来理想的结果 - 我在过去的24小时(不仅在当天)获得了总计,并且有时记录了较大的last_event_time的events_count_count ymill and laste_event_time记录。问题是 - 我在做什么错,如何实现预期的结果?

I'm quite new in Azure Stream Analytics but I need to push to Power BI (live dashboard) rolling totals from start of the day every time when new event arrives to Azure Stream Analytics job. I've created next SQL query to calculate this

SELECT
    Factory_Id,
    COUNT(0) as events_count, 
    MAX(event_create_time) as last_event_time,
    SUM(event_value) as event_value_total
INTO
        [powerbi]
FROM
    [eventhub] TIMESTAMP BY event_create_time
WHERE DAY(event_create_time) = DAY(System.Timestamp) and MONTH(event_create_time) = MONTH(System.Timestamp) and YEAR(event_create_time) = YEAR(System.Timestamp)
    GROUP BY Factory_Id, SlidingWindow(day,1)

But this didn't give me desired result - I get total for last 24 hours(not only for current day) and some times record with bigger last_event_time has events_count smaller then record with smaller last_event_time. The question is - What I'm doing wrong and how can I achieve desired outcome?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

禾厶谷欠 2025-02-04 23:11:52

编辑以下评论: 这计算了最后24小时的结果,但是需要的是每天运行总和/计数(从00:00到现在)。请参阅下面的更新答案。

我想知道分析方法比这里的聚合更好。

您没有使用时间窗口,而是在输入中计算并发出每个事件的记录:

SELECT
    Factory_Id,
    COUNT(*) OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) AS events_count,
    system.timestamp() as last_event_time,
    SUM(event_value) OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) as event_value_total
INTO PowerBI
FROM [eventhub] TIMESTAMP BY event_create_time

唯一的打ic是用于降落在同一时间邮票上的事件:

{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:00:00", "event_value" : 0.1}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:01:00", "event_value" : 2}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:01:00", "event_value" : 10}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:02:00", "event_value" : 0.2}

您将不会在该时间戳上获得单个记录:

factory_id events_count_count last_event_time event_value_total
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 2021-12-10T10:00:00.0000000Z 0.1
1 2 2021-12-10T10:01:00.0000000Z 2.1 2.1 2.1
1 3 2021-12-10T10:01:00.0000000Z 12.1
1 4 2021-12-10T10:02:00.0000000Z 12.2

我们可能想添加一个步骤如果您的仪表板问题是问题,则可以处理它。让我知道!

编辑以下评论

该新版本将在每日翻滚窗口上发出渐进效果。为此,每次获得新记录时,我们收集最后24小时。然后,我们从前一天开始卸下行,然后重新计算新的聚合物。要正确收集,我们首先需要确保每个时间戳只有1个记录。

-- First we make sure we get only 1 record per timestamp, to avoid duplication in the analytics function below
WITH Collapsed AS (
    SELECT
        Factory_Id,
        system.timestamp() as last_event_time,
        COUNT(*) AS C,
        SUM(event_value) AS S
    FROM [input1] TIMESTAMP BY event_create_time
    GROUP BY Factory_Id, system.timestamp()
),

-- Then we build an array at each timestamp, containing all records from the last 24h
Collected as (
    SELECT
        Factory_Id,
        system.timestamp() as last_event_time,
        COLLECT() OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) AS all_events
    FROM Collapsed
)

-- Finally we expand the array, removing the rows on the previous day, and aggregate
SELECT
    C.Factory_Id,
    system.timestamp() as last_event_time,
    SUM(U.ArrayValue.C) AS events_count,
    SUM(U.ArrayValue.S) AS event_value_total
FROM Collected AS C
CROSS APPLY GETARRAYELEMENTS(C.all_events) AS U
WHERE DAY(U.ArrayValue.last_event_time) = DAY(system.Timestamp())
GROUP BY C.Factory_Id, C.last_event_time, system.timestamp()

让我知道情况如何。

EDIT following comment: This computes the results for the last 24h, but what's needed is the running sum/count to day (from 00:00 until now). See updated answer below.

I'm wondering if an analytics approach would work better than an aggregation here.

Instead of using a time window, you calculate and emit a record for each event in input:

SELECT
    Factory_Id,
    COUNT(*) OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) AS events_count,
    system.timestamp() as last_event_time,
    SUM(event_value) OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) as event_value_total
INTO PowerBI
FROM [eventhub] TIMESTAMP BY event_create_time

The only hiccup is for events landing on the same time stamp:

{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:00:00", "event_value" : 0.1}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:01:00", "event_value" : 2}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:01:00", "event_value" : 10}
{"Factory_Id" : 1, "event_create_time" : "2021-12-10T10:02:00", "event_value" : 0.2}

You won't get a single record on that timestamp:

Factory_Id events_count last_event_time event_value_total
1 1 2021-12-10T10:00:00.0000000Z 0.1
1 2 2021-12-10T10:01:00.0000000Z 2.1
1 3 2021-12-10T10:01:00.0000000Z 12.1
1 4 2021-12-10T10:02:00.0000000Z 12.2

We may want to add a step to the query to deal with it if it's an issue for your dashboard. Let me know!

EDIT following comment

This new version will emit progressive results on a daily tumbling window. To do that, every time we get a new record, we collect the last 24h. Then we remove the rows from the previous day, and re-calculate the new aggregates. To collect properly, we first need to make sure we only have 1 record per timestamp.

-- First we make sure we get only 1 record per timestamp, to avoid duplication in the analytics function below
WITH Collapsed AS (
    SELECT
        Factory_Id,
        system.timestamp() as last_event_time,
        COUNT(*) AS C,
        SUM(event_value) AS S
    FROM [input1] TIMESTAMP BY event_create_time
    GROUP BY Factory_Id, system.timestamp()
),

-- Then we build an array at each timestamp, containing all records from the last 24h
Collected as (
    SELECT
        Factory_Id,
        system.timestamp() as last_event_time,
        COLLECT() OVER (PARTITION BY Factory_Id LIMIT DURATION (hour, 24)) AS all_events
    FROM Collapsed
)

-- Finally we expand the array, removing the rows on the previous day, and aggregate
SELECT
    C.Factory_Id,
    system.timestamp() as last_event_time,
    SUM(U.ArrayValue.C) AS events_count,
    SUM(U.ArrayValue.S) AS event_value_total
FROM Collected AS C
CROSS APPLY GETARRAYELEMENTS(C.all_events) AS U
WHERE DAY(U.ArrayValue.last_event_time) = DAY(system.Timestamp())
GROUP BY C.Factory_Id, C.last_event_time, system.timestamp()

Let me know how it goes.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文