AWS GLUE 3.0中的v3 API的巨大期望

发布于 2025-01-30 03:06:22 字数 4160 浏览 3 评论 0原文

我正在尝试使用AWS Glue 3.0上的巨大期望来验证管道。

这是我最初的尝试在运行时创建数据上下文的尝试,

def create_context():
    logger.info("Create DataContext Config.")
    data_context_config = DataContextConfig(
        config_version=2,
        plugins_directory=None,
        config_variables_file_path=None,
        # concurrency={"enabled": "true"},
        datasources={
            "my_spark_datasource": DatasourceConfig(
                class_name="Datasource",
                execution_engine={
                    "class_name": "SparkDFExecutionEngine",
                    "module_name": "great_expectations.execution_engine",
                },
                data_connectors={
                    "my_spark_dataconnector": {
                        "module_name": "great_expectations.datasource.data_connector",
                        "class_name": "RuntimeDataConnector",
                        "batch_identifiers": [""],
                    }
                },
            )
        },
        stores={
            "expectations_S3_store": {
                "class_name": "ExpectationsStore",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "expectations/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
            },
            "validations_S3_store": {
                "class_name": "ValidationsStore",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "validations/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
            },
            "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
            "checkpoint_S3_store": {
                "class_name": "CheckpointStore",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "suppress_store_backend_id": "true",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "checkpoints/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
            },
        },
        expectations_store_name="expectations_S3_store",
        validations_store_name="validations_S3_store",
        evaluation_parameter_store_name="evaluation_parameter_store",
        checkpoint_store_name="checkpoint_S3_store",
        data_docs_sites={
            "s3_site": {
                "class_name": "SiteBuilder",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "data_docs/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
                "site_index_builder": {
                    "class_name": "DefaultSiteIndexBuilder",
                    "show_cta_footer": True,
                },
            }
        },
        anonymous_usage_statistics={"enabled": True},
    )

    # Pass the DataContextConfig as a project_config to BaseDataContext
    context = BaseDataContext(project_config=data_context_config)

    logger.info("Create Checkpoint Config.")
    checkpoint_config = {
        "name": "my_checkpoint",
        "config_version": 1,
        "class_name": "Checkpoint",
        "run_name_template": "ingest_date=%YYYY-%MM-%DD",
        "expectation_suite_name": data_profile_expectation_suite_name,
        "runtime_configuration": {
            "result_format": {
                "result_format": "COMPLETE",
                "include_unexpected_rows": True,
            }
        },
        "evaluation_parameters": {},
    }
    context.add_checkpoint(**checkpoint_config)
    # logger.info(f'GE Data Context Config: "{data_context_config}"')

    return context


它会根据其文档创建一个错误,我遇到了一个错误,说试图在停止的Spark上下文上运行操作。

有没有更好的方法在Glue3.0中使用火花源? 我希望能够尽可能多地留在Glue3.0上,以防止必须维持两个版本的胶水工作

I'm trying to a validation in the pipeline using Great expectations on AWS glue 3.0.

Here's my initial attempt to create the data context at runtime based on their docs

def create_context():
    logger.info("Create DataContext Config.")
    data_context_config = DataContextConfig(
        config_version=2,
        plugins_directory=None,
        config_variables_file_path=None,
        # concurrency={"enabled": "true"},
        datasources={
            "my_spark_datasource": DatasourceConfig(
                class_name="Datasource",
                execution_engine={
                    "class_name": "SparkDFExecutionEngine",
                    "module_name": "great_expectations.execution_engine",
                },
                data_connectors={
                    "my_spark_dataconnector": {
                        "module_name": "great_expectations.datasource.data_connector",
                        "class_name": "RuntimeDataConnector",
                        "batch_identifiers": [""],
                    }
                },
            )
        },
        stores={
            "expectations_S3_store": {
                "class_name": "ExpectationsStore",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "expectations/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
            },
            "validations_S3_store": {
                "class_name": "ValidationsStore",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "validations/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
            },
            "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
            "checkpoint_S3_store": {
                "class_name": "CheckpointStore",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "suppress_store_backend_id": "true",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "checkpoints/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
            },
        },
        expectations_store_name="expectations_S3_store",
        validations_store_name="validations_S3_store",
        evaluation_parameter_store_name="evaluation_parameter_store",
        checkpoint_store_name="checkpoint_S3_store",
        data_docs_sites={
            "s3_site": {
                "class_name": "SiteBuilder",
                "store_backend": {
                    "class_name": "TupleS3StoreBackend",
                    "bucket": data_profile_s3_store_bucket,
                    "prefix": "data_docs/",
                    "s3_put_options": {"ACL": "bucket-owner-full-control"},
                },
                "site_index_builder": {
                    "class_name": "DefaultSiteIndexBuilder",
                    "show_cta_footer": True,
                },
            }
        },
        anonymous_usage_statistics={"enabled": True},
    )

    # Pass the DataContextConfig as a project_config to BaseDataContext
    context = BaseDataContext(project_config=data_context_config)

    logger.info("Create Checkpoint Config.")
    checkpoint_config = {
        "name": "my_checkpoint",
        "config_version": 1,
        "class_name": "Checkpoint",
        "run_name_template": "ingest_date=%YYYY-%MM-%DD",
        "expectation_suite_name": data_profile_expectation_suite_name,
        "runtime_configuration": {
            "result_format": {
                "result_format": "COMPLETE",
                "include_unexpected_rows": True,
            }
        },
        "evaluation_parameters": {},
    }
    context.add_checkpoint(**checkpoint_config)
    # logger.info(f'GE Data Context Config: "{data_context_config}"')

    return context


Using this i get an error saying attempting to run operations on stopped spark context.

Is there a better way to use the spark source in glue3.0?
I want to be able to stay on glue3.0 as much as possible to prevent having to maintain two versions of glue jobs

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

盗心人 2025-02-06 03:06:22

You can fix this by setting the force_reuse_spark_context true,这是一个快速示例(yml):

config_version: 3.0
datasources:
  my_spark_datasource:
    class_name: Datasource
    module_name: great_expectations.datasource
    data_connectors:
      my_spark_dataconnector:
        class_name: RuntimeDataConnector
        module_name: great_expectations.datasource.data_connector
        batch_identifiers: {}
    execution_engine:
      class_name: SparkDFExecutionEngine
      force_reuse_spark_context: true

我想添加的另一件事是,您可以在yml文件中定义上下文并将其上传到上传到S3。然后,您可以使用以下功能在胶水作业中解析此文件:

def parse_data_context_from_S3(bucket: str, prefix: str = ""):
    object_key = os.path.join(prefix, "great_expectations.yml")

    print(f"Parsing s3://{bucket}/{object_key}")
    s3 = boto3.session.Session().client("s3")
    s3_object = s3.get_object(Bucket=bucket, Key=object_key)["Body"]

    datacontext_config = yaml.safe_load(s3_object.read())

    project_config = DataContextConfig(**datacontext_config)

    context = BaseDataContext(project_config=project_config)
    return context

您的CI/CD管道可以在部署到环境时轻松替换YML文件中的存储店(dev) homprod)。

如果您使用的是RuntimedatAconnector,则使用GLUE 3.0毫无问题。如果您使用的是CheleRedAssets3DataConnector,并且数据集使用KMS加密,则不会适用。在这种情况下,我只能使用胶水2.0。

You can fix this by setting the force_reuse_spark_context to True, here is a quick example (YML):

config_version: 3.0
datasources:
  my_spark_datasource:
    class_name: Datasource
    module_name: great_expectations.datasource
    data_connectors:
      my_spark_dataconnector:
        class_name: RuntimeDataConnector
        module_name: great_expectations.datasource.data_connector
        batch_identifiers: {}
    execution_engine:
      class_name: SparkDFExecutionEngine
      force_reuse_spark_context: true

Another thing I would like to add is that you can define the context in a YML file and upload it to S3. Then, you can parse this file in the glue job with the function below:

def parse_data_context_from_S3(bucket: str, prefix: str = ""):
    object_key = os.path.join(prefix, "great_expectations.yml")

    print(f"Parsing s3://{bucket}/{object_key}")
    s3 = boto3.session.Session().client("s3")
    s3_object = s3.get_object(Bucket=bucket, Key=object_key)["Body"]

    datacontext_config = yaml.safe_load(s3_object.read())

    project_config = DataContextConfig(**datacontext_config)

    context = BaseDataContext(project_config=project_config)
    return context

Your CI/CD pipeline can easily replace the store backends in the YML file while deploying it to your environments (dev, hom, prod).

If you are using the RuntimeDataConnector, you should have no problem using Glue 3.0. The same does not apply if you are using the InferredAssetS3DataConnector and your datasets are encrypted using KMS. In this case, I was only able to use Glue 2.0.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文