从DynamoDB到OpenSearch的json对象?

发布于 2025-02-07 04:57:28 字数 5158 浏览 2 评论 0原文

我有一个DynamoDB流,该流通过使用BOTO3的lambda函数索引并将数据推向搜索。流中的项目在这里看起来像这个JSON对象:

{
  "d8346fda0c35418580c98209df378653": {
    "M": {
      "cloudPlatformAoi": {
        "L": [
          {
            "S": "Google Cloud Platform"
          }
        ]
      },
      "cloudPlatformStrength": {
        "L": [
          {
            "S": "AWS"
          }
        ]
      },
      "integratedDevelopmentEnvironmentAoi": {
        "L": [
          {
            "S": "TextMate"
          }
        ]
      },
      "webFrameworkStrength": {
        "L": [
          {
            "S": "Drupal"
          }
        ]
      },
      "lastEdited": {
        "S": "13-Jun-2022 (16:34:09.233933)"
      },
      "title": {
        "S": "This is my third post (edited)"
      },
      "body": {
        "S": "asdf"
      },
      "programmingLanguageStrength": {
        "L": [
          {
            "S": "Python"
          }
        ]
      },
      "programmingLanguageAoi": {
        "L": [
          {
            "S": "Elixir"
          }
        ]
      },
      "dbAoi": {
        "L": [
          {
            "S": "PostgreSQL"
          }
        ]
      },
      "databaseStrength": {
        "L": [
          {
            "S": "DynamoDB"
          }
        ]
      },
      "webFrameworkAoi": {
        "L": [
          {
            "S": "Symfony"
          }
        ]
      },
      "timeCreated": {
        "S": "09-Jun-2022 (13:30:29.967379)"
      },
      "integratedDevelopmentEnvironment": {
        "L": [
          {
            "S": "TextMate"
          }
        ]
      },
      "level": {
        "S": "one"
      }
    }
  },
  "1d5c49e0fc8c458ebc2e74835831a5c8": {
    "M": {
      "cloudPlatformAoi": {
        "L": [
          {
            "S": "Google Cloud Platform"
          }
        ]
      },
      "cloudPlatformStrength": {
        "L": [
          {
            "S": "Google Cloud Platform"
          }
        ]
      },
      "integratedDevelopmentEnvironmentAoi": {
        "L": [
          {
            "S": "Vim"
          }
        ]
      },
      "webFrameworkStrength": {
        "L": [
          {
            "S": "Flask"
          }
        ]
      },
      "lastEdited": {
        "S": "13-Jun-2022 (17:30:32.808160)"
      },
      "title": {
        "S": "My First Post (edited) 1"
      },
      "body": {
        "S": "test"
      },
      "programmingLanguageStrength": {
        "L": [
          {
            "S": "Python"
          }
        ]
      },
      "programmingLanguageAoi": {
        "L": [
          {
            "S": "Erlang"
          }
        ]
      },
      "dbAoi": {
        "L": [
          {
            "S": "Oracle"
          }
        ]
      },
      "databaseStrength": {
        "L": [
          {
            "S": "Couchbase"
          }
        ]
      },
      "webFrameworkAoi": {
        "L": [
          {
            "S": "Spring"
          }
        ]
      },
      "timeCreated": {
        "S": "13-Jun-2022 (16:28:23.582059)"
      },
      "integratedDevelopmentEnvironment": {
        "L": [
          {
            "S": "Vim"
          }
        ]
      },
      "awsomeBuilderStage": {
        "S": "2"
      }
    }
  },
  "bd9cc68521564858871a7482d77bb1a5": {
    "M": {
      "cloudPlatformAoi": {
        "L": [
          {
            "S": "Google Cloud Platform"
          }
        ]
      },
      "cloudPlatformStrength": {
        "L": [
          {
            "S": "Google Cloud Platform"
          }
        ]
      },
      "integratedDevelopmentEnvironmentAoi": {
        "L": [
          {
            "S": "Vim"
          }
        ]
      },
      "webFrameworkStrength": {
        "L": [
          {
            "S": "Flask"
          }
        ]
      },
      "lastEdited": {
        "S": "13-Jun-2022 (16:37:50.576490)"
      },
      "title": {
        "S": "My First Post (edited)"
      },
      "body": {
        "S": "test"
      },
      "programmingLanguageStrength": {
        "L": [
          {
            "S": "Python"
          }
        ]
      },
      "programmingLanguageAoi": {
        "L": [
          {
            "S": "Erlang"
          }
        ]
      },
      "dbAoi": {
        "L": [
          {
            "S": "Oracle"
          }
        ]
      },
      "databaseStrength": {
        "L": [
          {
            "S": "Couchbase"
          }
        ]
      },
      "webFrameworkAoi": {
        "L": [
          {
            "S": "Spring"
          }
        ]
      },
      "timeCreated": {
        "S": "13-Jun-2022 (16:28:23.582059)"
      },
      "integratedDevelopmentEnvironment": {
        "L": [
          {
            "S": "Vim"
          }
        ]
      },
      "awsomeBuilderStage": {
        "S": "3"
      }
    }
  }
}

当我索引并将对象推入OpenSearch时,它包括与每个嵌套JSON对象相关的类型。例如:

"cloudPlatformStrength": {
        "L": [
          {
            "S": "AWS"
          }
        ]
      }

而不是:

"cloudPlatformStrength": [
    "Google Cloud Platform"
   ]

如何修复被推到OpenSearch的数据?我只需要在每个条目上执行ETL流程吗?还是有更好的方法?

I have a DynamoDB stream that indexes and pushes data to OpenSearch via a Lambda Function that uses Boto3. The item in the stream looks like this JSON object here:

{
  "d8346fda0c35418580c98209df378653": {
    "M": {
      "cloudPlatformAoi": {
        "L": [
          {
            "S": "Google Cloud Platform"
          }
        ]
      },
      "cloudPlatformStrength": {
        "L": [
          {
            "S": "AWS"
          }
        ]
      },
      "integratedDevelopmentEnvironmentAoi": {
        "L": [
          {
            "S": "TextMate"
          }
        ]
      },
      "webFrameworkStrength": {
        "L": [
          {
            "S": "Drupal"
          }
        ]
      },
      "lastEdited": {
        "S": "13-Jun-2022 (16:34:09.233933)"
      },
      "title": {
        "S": "This is my third post (edited)"
      },
      "body": {
        "S": "asdf"
      },
      "programmingLanguageStrength": {
        "L": [
          {
            "S": "Python"
          }
        ]
      },
      "programmingLanguageAoi": {
        "L": [
          {
            "S": "Elixir"
          }
        ]
      },
      "dbAoi": {
        "L": [
          {
            "S": "PostgreSQL"
          }
        ]
      },
      "databaseStrength": {
        "L": [
          {
            "S": "DynamoDB"
          }
        ]
      },
      "webFrameworkAoi": {
        "L": [
          {
            "S": "Symfony"
          }
        ]
      },
      "timeCreated": {
        "S": "09-Jun-2022 (13:30:29.967379)"
      },
      "integratedDevelopmentEnvironment": {
        "L": [
          {
            "S": "TextMate"
          }
        ]
      },
      "level": {
        "S": "one"
      }
    }
  },
  "1d5c49e0fc8c458ebc2e74835831a5c8": {
    "M": {
      "cloudPlatformAoi": {
        "L": [
          {
            "S": "Google Cloud Platform"
          }
        ]
      },
      "cloudPlatformStrength": {
        "L": [
          {
            "S": "Google Cloud Platform"
          }
        ]
      },
      "integratedDevelopmentEnvironmentAoi": {
        "L": [
          {
            "S": "Vim"
          }
        ]
      },
      "webFrameworkStrength": {
        "L": [
          {
            "S": "Flask"
          }
        ]
      },
      "lastEdited": {
        "S": "13-Jun-2022 (17:30:32.808160)"
      },
      "title": {
        "S": "My First Post (edited) 1"
      },
      "body": {
        "S": "test"
      },
      "programmingLanguageStrength": {
        "L": [
          {
            "S": "Python"
          }
        ]
      },
      "programmingLanguageAoi": {
        "L": [
          {
            "S": "Erlang"
          }
        ]
      },
      "dbAoi": {
        "L": [
          {
            "S": "Oracle"
          }
        ]
      },
      "databaseStrength": {
        "L": [
          {
            "S": "Couchbase"
          }
        ]
      },
      "webFrameworkAoi": {
        "L": [
          {
            "S": "Spring"
          }
        ]
      },
      "timeCreated": {
        "S": "13-Jun-2022 (16:28:23.582059)"
      },
      "integratedDevelopmentEnvironment": {
        "L": [
          {
            "S": "Vim"
          }
        ]
      },
      "awsomeBuilderStage": {
        "S": "2"
      }
    }
  },
  "bd9cc68521564858871a7482d77bb1a5": {
    "M": {
      "cloudPlatformAoi": {
        "L": [
          {
            "S": "Google Cloud Platform"
          }
        ]
      },
      "cloudPlatformStrength": {
        "L": [
          {
            "S": "Google Cloud Platform"
          }
        ]
      },
      "integratedDevelopmentEnvironmentAoi": {
        "L": [
          {
            "S": "Vim"
          }
        ]
      },
      "webFrameworkStrength": {
        "L": [
          {
            "S": "Flask"
          }
        ]
      },
      "lastEdited": {
        "S": "13-Jun-2022 (16:37:50.576490)"
      },
      "title": {
        "S": "My First Post (edited)"
      },
      "body": {
        "S": "test"
      },
      "programmingLanguageStrength": {
        "L": [
          {
            "S": "Python"
          }
        ]
      },
      "programmingLanguageAoi": {
        "L": [
          {
            "S": "Erlang"
          }
        ]
      },
      "dbAoi": {
        "L": [
          {
            "S": "Oracle"
          }
        ]
      },
      "databaseStrength": {
        "L": [
          {
            "S": "Couchbase"
          }
        ]
      },
      "webFrameworkAoi": {
        "L": [
          {
            "S": "Spring"
          }
        ]
      },
      "timeCreated": {
        "S": "13-Jun-2022 (16:28:23.582059)"
      },
      "integratedDevelopmentEnvironment": {
        "L": [
          {
            "S": "Vim"
          }
        ]
      },
      "awsomeBuilderStage": {
        "S": "3"
      }
    }
  }
}

When I index and push the object to OpenSearch it includes the Types associated with each nested JSON Object. For example:

"cloudPlatformStrength": {
        "L": [
          {
            "S": "AWS"
          }
        ]
      }

Instead of:

"cloudPlatformStrength": [
    "Google Cloud Platform"
   ]

How would I go about fixing the data being pushed to OpenSearch? Would I have to just perform ETL process on each entry? Or is there a better way to do so?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

离线来电— 2025-02-14 04:57:28

这应该做您想做的事:

from boto3.dynamodb.types import TypeDeserializer

def handler(event, context):
    deserializer = TypeDeserializer()
    for record in event['Records']:

        data = {key: deserializer.deserialize(value) for key, value in 
            record['dynamodb']['NewImage'].items()}

This should do what you want:

from boto3.dynamodb.types import TypeDeserializer

def handler(event, context):
    deserializer = TypeDeserializer()
    for record in event['Records']:

        data = {key: deserializer.deserialize(value) for key, value in 
            record['dynamodb']['NewImage'].items()}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文