仅通过 ID 匹配嵌套元素来丰富 ElasticSearch 中的文档

发布于 2025-01-10 20:59:32 字数 7743 浏览 0 评论 0原文

我们正在创建一些包,但由于微服务之间发送的数据量巨大,该过程目前相当缓慢。因此,我修剪了这些微服务之间发送的信息,而是希望直接从 ElasticSearch 中使用必要的信息来丰富文档。这给出了以下形状的文档:

      {
        "_index" : "packages-2022.02.28",
        "_type" : "_doc",
        "_id" : "SG_DH-8019-ao-74783-20220315-12",
        "_score" : 1.0,
        "_source" : {
          "id" : "SG_DH-8019-ao-74783-20220315-12",
          "updatedOn" : "2022-02-28T14:45:57.7511562+01:00",
          "code" : "SG",
          "createdDate" : "2022-02-28T15:17:48.2571391+01:00",
          "content" : {
            "contentId" : "74783",
            "units" : [
              {
                "id" : "HB_DBL.ST_RO_NFP",
                "globalId" : "74783_HB_DBL.ST_RO_NFP",
                "globalIntId" : -592692223,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.ST_BB_NFP",
                "globalId" : "74783_HB_DBL.ST_BB_NFP",
                "globalIntId" : 446952442,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.ST_AI_NFP",
                "globalId" : "74783_HB_DBL.ST_AI_NFP",
                "globalIntId" : -1174348304,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.SU_RO_NFP",
                "globalId" : "74783_HB_DBL.SU_RO_NFP",
                "globalIntId" : -2111509049,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.SU_BB_NFP",
                "globalId" : "74783_HB_DBL.SU_BB_NFP",
                "globalIntId" : 307969427,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.SU_AI_NFP",
                "globalId" : "74783_HB_DBL.SU_AI_NFP",
                "globalIntId" : 1418623211,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.PO-1_RO_NFP",
                "globalId" : "74783_HB_DBL.PO-1_RO_NFP",
                "globalIntId" : 1328251159,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.PO-1_BB_NFP",
                "globalId" : "74783_HB_DBL.PO-1_BB_NFP",
                "globalIntId" : -1228155826,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.PO-1_AI_NFP",
                "globalId" : "74783_HB_DBL.PO-1_AI_NFP",
                "globalIntId" : 749215308,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.OF_RO_NFP",
                "globalId" : "74783_HB_DBL.OF_RO_NFP",
                "globalIntId" : 1981865239,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.OF_BB_NFP",
                "globalId" : "74783_HB_DBL.OF_BB_NFP",
                "globalIntId" : 545563435,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.OF_AI_NFP",
                "globalId" : "74783_HB_DBL.OF_AI_NFP",
                "globalIntId" : -481310774,
                "forPackaging" : false
              }
            ]
            "duration" : {
              "value" : 12,
              "durationType" : "Day"
            }
          },
          "generatedInfo" : {
            "productGroupName" : null,
            "subProductGroupName" : "Foo",
            "version" : 0
          }
        }
      }
]

带有来自丰富策略的形状索引的信息(当查询时):

      {
        "_index" : ".enrich-package-enrich-1646044129711",
        "_type" : "_doc",
        "_id" : "zt_gP38BZeMUiw0-LxLa",
        "_score" : 1.0,
        "_source" : {
          "contentId" : "365114",
          "name" : "PackageName",
          "board" : [
            "B1",
            "B2"
          ],
          "units" : [
            {
              "price" : [
                {
                  "margin" : 0,
                  "combination" : 10000,
                  "value" : 189030,
                  "currency" : "EUR"
                }
              ],
              "id" : "W2M_AX2_SC_NFP",
              "globalId" : "365114_W2M_AX2_SC_NFP",
              "globalIntId" : -988330164,
              "name" : "UnitName",
              "prop1": "Foo",
              "prop2": "Bar"
            }
          ]
        }
      }
]

我最初可以使其工作。然而,在丰富时,我只想保留与要保存的文档中具有相同全局ID的单元。为此,我还尝试使用简单的 Enrich 处理器和引用丰富策略的 ForEach 处理器来丰富每个单元,匹配 globalId ,甚至尝试匹配其哈希码 globalIntId code> (尽管即使在后一种情况下,我也经常会收到它“不是整数”的错误,即使它显然是一个)。这个单独的丰富策略索引的形状类似于以下内容:

      {
        "_index" : ".enrich-package-unit-enrich-1646044158417",
        "_type" : "_doc",
        "_id" : "dN_gP38BZeMUiw0-t2Io",
        "_score" : 1.0,
        "_source" : {
          "units" : [
            {
              "price" : [
                {
                  "margin" : 0,
                  "combination" : 10000,
                  "value" : 189030,
                  "currency" : "EUR"
                }
              ],
              "globalId" : "365114_W2M_AX2_SC_NFP",
              "globalIntId" : -988330164,
              "name" : "UnitName",
              "prop1": "Foo",
              "prop2": "Bar",
              "id" : "W2M_AX2_SC_NFP"
            }
          ]
        }
      }
]

我也尝试过使用 Painless 脚本,但到目前为止我的经验并不是完全无痛(双关语)。每次我尝试访问任何数据(我尝试过遇到的各种方法)时,除了编译错误之外,我什么也得不到。另外,考虑到我正在努力加快这个过程,如果我想让它正常工作,我有点担心这里的性能。我读过 Painless 很快,但我也听说它实际上相当慢(我认为与使用处理器相比,不一定是其他脚本)。

现在,我不知道如何让它发挥作用。如果可能的话,我更愿意在不编写脚本的情况下执行此操作。但是,如果只能使用脚本来实现,只要性能可以接受就可以。我使用的是弹性 7.12。

更新 1:
我正在使用 Nest 从 C# 创建丰富策略,如下所示:

        var enrichPolicyRequest = new PutEnrichPolicyRequest(enrichPolicyName)
        {
            Match = new MyPackageBedEnrichPolicy(index)
        };

        var putEnrichPolicyResponse = await elasticClient.Enrich.PutPolicyAsync(enrichPolicyRequest);
        var executeEnrichPolicyResponse = await elasticClient.Enrich.ExecutePolicyAsync(enrichPolicyName);
...

    public class MyPackageBedEnrichPolicy : IEnrichPolicy
    {
        public MyPackageBedEnrichPolicy(string index)
        {
            Indices = index;
            MatchField = "contentId";
            EnrichFields = new[] { "name", "board", "units" };
        }

        public Indices Indices { get; set; }
        public Field MatchField { get; set; }
        public Fields EnrichFields { get; set; }
        public string Query { get; set; }
    }

以及单元的索引非常相似,但

    public class MyPackageUnitEnrichPolicy : IEnrichPolicy
    {
        public MyPackageUnitEnrichPolicy(string index)
        {
            Indices = index;
            MatchField = "units.globalId";
            EnrichFields = new[] { "units" };
        }
        ...

目前,我已经在 Kibana 中创建了摄取处理器,以便更轻松地进行原型设计,尽管我将使用 Nest 处理该问题以后也一样。我对它们的定义基本上如下:

丰富处理器

Unit丰富处理器

这是 JSON 中摄取管道的定义:

[
  {
    "enrich": {
      "field": "content.contentId",
      "policy_name": "enrichPolicyName",
      "target_field": "enrichTest"
    }
  },
  {
    "foreach": {
      "field": "content.units.globalId",
      "processor": {
        "enrich": {
          "field": "content.units.globalId",
          "policy_name": "unitEnrichPolicyName",
          "target_field": "enrichTest.units",
          "tag": "enrich-units-on-globalId-processor"
        }
      }
    }
  }
]

We're creating some packages, but that process is currently rather slow, because of the sheer amount of data being sent between microservices. Therefore, I have pruned the information being sent between those microservices and instead want to enrich the documents with the necessary information directly from within ElasticSearch. This gives documents of the following shape:

      {
        "_index" : "packages-2022.02.28",
        "_type" : "_doc",
        "_id" : "SG_DH-8019-ao-74783-20220315-12",
        "_score" : 1.0,
        "_source" : {
          "id" : "SG_DH-8019-ao-74783-20220315-12",
          "updatedOn" : "2022-02-28T14:45:57.7511562+01:00",
          "code" : "SG",
          "createdDate" : "2022-02-28T15:17:48.2571391+01:00",
          "content" : {
            "contentId" : "74783",
            "units" : [
              {
                "id" : "HB_DBL.ST_RO_NFP",
                "globalId" : "74783_HB_DBL.ST_RO_NFP",
                "globalIntId" : -592692223,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.ST_BB_NFP",
                "globalId" : "74783_HB_DBL.ST_BB_NFP",
                "globalIntId" : 446952442,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.ST_AI_NFP",
                "globalId" : "74783_HB_DBL.ST_AI_NFP",
                "globalIntId" : -1174348304,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.SU_RO_NFP",
                "globalId" : "74783_HB_DBL.SU_RO_NFP",
                "globalIntId" : -2111509049,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.SU_BB_NFP",
                "globalId" : "74783_HB_DBL.SU_BB_NFP",
                "globalIntId" : 307969427,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.SU_AI_NFP",
                "globalId" : "74783_HB_DBL.SU_AI_NFP",
                "globalIntId" : 1418623211,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.PO-1_RO_NFP",
                "globalId" : "74783_HB_DBL.PO-1_RO_NFP",
                "globalIntId" : 1328251159,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.PO-1_BB_NFP",
                "globalId" : "74783_HB_DBL.PO-1_BB_NFP",
                "globalIntId" : -1228155826,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.PO-1_AI_NFP",
                "globalId" : "74783_HB_DBL.PO-1_AI_NFP",
                "globalIntId" : 749215308,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.OF_RO_NFP",
                "globalId" : "74783_HB_DBL.OF_RO_NFP",
                "globalIntId" : 1981865239,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.OF_BB_NFP",
                "globalId" : "74783_HB_DBL.OF_BB_NFP",
                "globalIntId" : 545563435,
                "forPackaging" : false
              },
              {
                "id" : "HB_DBL.OF_AI_NFP",
                "globalId" : "74783_HB_DBL.OF_AI_NFP",
                "globalIntId" : -481310774,
                "forPackaging" : false
              }
            ]
            "duration" : {
              "value" : 12,
              "durationType" : "Day"
            }
          },
          "generatedInfo" : {
            "productGroupName" : null,
            "subProductGroupName" : "Foo",
            "version" : 0
          }
        }
      }
]

with information from an enrich policy's index of the shape (when queried):

      {
        "_index" : ".enrich-package-enrich-1646044129711",
        "_type" : "_doc",
        "_id" : "zt_gP38BZeMUiw0-LxLa",
        "_score" : 1.0,
        "_source" : {
          "contentId" : "365114",
          "name" : "PackageName",
          "board" : [
            "B1",
            "B2"
          ],
          "units" : [
            {
              "price" : [
                {
                  "margin" : 0,
                  "combination" : 10000,
                  "value" : 189030,
                  "currency" : "EUR"
                }
              ],
              "id" : "W2M_AX2_SC_NFP",
              "globalId" : "365114_W2M_AX2_SC_NFP",
              "globalIntId" : -988330164,
              "name" : "UnitName",
              "prop1": "Foo",
              "prop2": "Bar"
            }
          ]
        }
      }
]

I originally could get this working. However, when enriching, I only want to keep the units with the same global ID as those in the document to save. To this end, I have tried also enriching each unit with a simple Enrich processor and a ForEach processor referencing the enrich policy, matching on globalId and have even attempted matching on its hash code globalIntId (although in even in the latter case I would often get the error that it 'is not an integer', even though it clearly is one). This separate enrich-policy index has a shape similar to the following:

      {
        "_index" : ".enrich-package-unit-enrich-1646044158417",
        "_type" : "_doc",
        "_id" : "dN_gP38BZeMUiw0-t2Io",
        "_score" : 1.0,
        "_source" : {
          "units" : [
            {
              "price" : [
                {
                  "margin" : 0,
                  "combination" : 10000,
                  "value" : 189030,
                  "currency" : "EUR"
                }
              ],
              "globalId" : "365114_W2M_AX2_SC_NFP",
              "globalIntId" : -988330164,
              "name" : "UnitName",
              "prop1": "Foo",
              "prop2": "Bar",
              "id" : "W2M_AX2_SC_NFP"
            }
          ]
        }
      }
]

I have also tried to use Painless script, but so far my experience hasn't been exactly painless (pun intended). Every time I would try to access any data (I've tried various ways I encountered), I would get nothing but compilation errors. Also, given that I'm working on making this process faster, I'm a bit worried about performance here if I were to get it to work. I've read that Painless is fast, yet I've also heard it's actually fairly slow (I think compared to using processors, not necessarily other scripts).

Now, I'm at a loss about how to get this to work. I would prefer to do this without scripting if possible. However, if it is only possible using scripting, that's okay as long as the performance is acceptable. I'm using Elastic 7.12.

Update 1:
I'm creating the enrich policy from C# using Nest like so:

        var enrichPolicyRequest = new PutEnrichPolicyRequest(enrichPolicyName)
        {
            Match = new MyPackageBedEnrichPolicy(index)
        };

        var putEnrichPolicyResponse = await elasticClient.Enrich.PutPolicyAsync(enrichPolicyRequest);
        var executeEnrichPolicyResponse = await elasticClient.Enrich.ExecutePolicyAsync(enrichPolicyName);
...

    public class MyPackageBedEnrichPolicy : IEnrichPolicy
    {
        public MyPackageBedEnrichPolicy(string index)
        {
            Indices = index;
            MatchField = "contentId";
            EnrichFields = new[] { "name", "board", "units" };
        }

        public Indices Indices { get; set; }
        public Field MatchField { get; set; }
        public Fields EnrichFields { get; set; }
        public string Query { get; set; }
    }

and the index for the units very similarly, but with

    public class MyPackageUnitEnrichPolicy : IEnrichPolicy
    {
        public MyPackageUnitEnrichPolicy(string index)
        {
            Indices = index;
            MatchField = "units.globalId";
            EnrichFields = new[] { "units" };
        }
        ...

For now, I have created the ingest processors in Kibana for easier prototyping, though I will have take care of that using Nest later as well. I have defined them basically as follows:

Enrich processor

Unit enrich processor

This is the definition of the ingest pipeline in JSON:

[
  {
    "enrich": {
      "field": "content.contentId",
      "policy_name": "enrichPolicyName",
      "target_field": "enrichTest"
    }
  },
  {
    "foreach": {
      "field": "content.units.globalId",
      "processor": {
        "enrich": {
          "field": "content.units.globalId",
          "policy_name": "unitEnrichPolicyName",
          "target_field": "enrichTest.units",
          "tag": "enrich-units-on-globalId-processor"
        }
      }
    }
  }
]

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文