当使用Elasticsearch时,如何随着时间的推移将事件驱动的数据存储起来并处理不同的更新节奏?

发布于 2025-02-13 18:22:28 字数 2066 浏览 0 评论 0原文

我在elasticsearch中的文档看起来如下:

  {
    "_id" : "Eba714EBOZm74KGk5PJr",
    "_source" : {
      "customer-id" : "1",
      "customer-balance" : 15,
      "eventDate" : 1640995200000 //01-jan-22
    }
  },
  {
    "_id" : "z7a814EBOZm74KGkHfLv",
    "_source" : {
      "customer-id" : "1",
      "customer-balance" : 20,
      "eventDate" : 1641081600000 //02-jan-22
    }
  },
  {
    "_id" : "dba814EBOZm74KGkNfPE",
    "_source" : {
      "customer-id" : "1",
      "customer-balance" : 25,
      "eventDate" : 1641168000000 //03-jan-22
    }
  },
  {
    "_id" : "5Le814EBOZm74KGkcQO_",
    "_source" : {
      "customer-id" : "2",
      "customer-balance" : 15,
      "eventDate" : 1640995200000 //01-jan-22
    }
  }

我的数据是事件驱动的,我在更改弹性搜索时(即获取新数据时)。但是,这些数据是在不同的节奏中出现的,一些客户可能会在一分钟内更新,而另一些客户每周都会更新。我想按任意时期(例如每日)和搜索汇总每个存储桶的总余额。为此,我可以编写以下查询:

GET my_index/_search
{
  "aggs": {
    "balance_over_time": {
      "date_histogram": {
        "field": "eventDate",
        "fixed_interval": "1d"
      },
      "aggs": {
        "daily-balances": {
          "sum": {
            "field": "customer-balance"
          }
        }
      }
    }
  }
}

我们会得到以下结果:

"aggregations" : {
"balance_over_time" : {
  "buckets" : [
    {
      "key" : 1640995200000,
      "doc_count" : 2,
      "daily-balances" : {
        "value" : 30.0
      }
    },
    {
      "key" : 1641081600000,
      "doc_count" : 1,
      "daily-balances" : {
        "value" : 20.0
      }
    },
    {
      "key" : 1641168000000,
      "doc_count" : 1,
      "daily-balances" : {
        "value" : 25.0
      }
    }
  ]
}

您会注意到,客户“ 2”尚未在第2或3天进行更新 - 我希望他们的平衡能够向前滚动,以便我得到每日平衡为30、35、40,而不是30、20、25。

我该怎么做?其他一些局限性:

  1. 不仅仅是1个平衡字段,我有数十个领域。
  2. 理想情况下,我想允许我的查询不在桶间隔中受到限制,例如,我可能想使用5分钟的间隔或可能要使用30天的间隔,并且想决定即时
  3. 我也不想包括如果他们在一段时间内有两个更新,则两次相同的客户 - 理想情况下,我可以选择第一个/最后/平均更新的方法。

这个可行吗?我是否试图使用弹性来做一些事情不擅长的事情?

我的弹性是由Elastic.co托管的,我正在使用LogStash创建数据,该数据来自Kafka主题,如果有任何相关的数据。

My documents in ElasticSearch look like the following:

  {
    "_id" : "Eba714EBOZm74KGk5PJr",
    "_source" : {
      "customer-id" : "1",
      "customer-balance" : 15,
      "eventDate" : 1640995200000 //01-jan-22
    }
  },
  {
    "_id" : "z7a814EBOZm74KGkHfLv",
    "_source" : {
      "customer-id" : "1",
      "customer-balance" : 20,
      "eventDate" : 1641081600000 //02-jan-22
    }
  },
  {
    "_id" : "dba814EBOZm74KGkNfPE",
    "_source" : {
      "customer-id" : "1",
      "customer-balance" : 25,
      "eventDate" : 1641168000000 //03-jan-22
    }
  },
  {
    "_id" : "5Le814EBOZm74KGkcQO_",
    "_source" : {
      "customer-id" : "2",
      "customer-balance" : 15,
      "eventDate" : 1640995200000 //01-jan-22
    }
  }

My data is event-driven, I update elastic search when it changes (i.e. when I get new data). This data however comes in at different cadences, some customers may be updated by the minute and others weekly. I want to bucket data by arbitrary periods (e.g. daily) and on search aggregate the total balance across customers for each bucket. To do this I can write a query like the following:

GET my_index/_search
{
  "aggs": {
    "balance_over_time": {
      "date_histogram": {
        "field": "eventDate",
        "fixed_interval": "1d"
      },
      "aggs": {
        "daily-balances": {
          "sum": {
            "field": "customer-balance"
          }
        }
      }
    }
  }
}

We get the following result:

"aggregations" : {
"balance_over_time" : {
  "buckets" : [
    {
      "key" : 1640995200000,
      "doc_count" : 2,
      "daily-balances" : {
        "value" : 30.0
      }
    },
    {
      "key" : 1641081600000,
      "doc_count" : 1,
      "daily-balances" : {
        "value" : 20.0
      }
    },
    {
      "key" : 1641168000000,
      "doc_count" : 1,
      "daily-balances" : {
        "value" : 25.0
      }
    }
  ]
}

You will note that customer "2" hasn't been updated on days 2 or 3 - I would like their balance to therefore roll forward so that I get daily balances of 30, 35, 40 instead of 30, 20, 25.

How can I do this? A few other limitations:

  1. It isn't just 1 balance field, I have dozens of these fields.
  2. Ideally I want to allow my queries to not be restricted in the bucket intervals, e.g. I may want to use 5 mins intervals or may want to use 30-day intervals and would like to decide on the fly
  3. I also don't want to include the same customer twice if they have two updates in a period - ideally I could choose the first/last/average update wins approach.

Is any of this viable? Am I trying to use Elastic to do something it's just not good at?

My elastic is being hosted by elastic.co and I'm using logstash to create the data, sourced from a Kafka topic, if any of that is relevant.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文