AWS GLUE 3.0中的v3 API的巨大期望
我正在尝试使用AWS Glue 3.0上的巨大期望来验证管道。
这是我最初的尝试在运行时创建数据上下文的尝试,
def create_context():
logger.info("Create DataContext Config.")
data_context_config = DataContextConfig(
config_version=2,
plugins_directory=None,
config_variables_file_path=None,
# concurrency={"enabled": "true"},
datasources={
"my_spark_datasource": DatasourceConfig(
class_name="Datasource",
execution_engine={
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
data_connectors={
"my_spark_dataconnector": {
"module_name": "great_expectations.datasource.data_connector",
"class_name": "RuntimeDataConnector",
"batch_identifiers": [""],
}
},
)
},
stores={
"expectations_S3_store": {
"class_name": "ExpectationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "expectations/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
"validations_S3_store": {
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "validations/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
"evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
"checkpoint_S3_store": {
"class_name": "CheckpointStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"suppress_store_backend_id": "true",
"bucket": data_profile_s3_store_bucket,
"prefix": "checkpoints/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
},
expectations_store_name="expectations_S3_store",
validations_store_name="validations_S3_store",
evaluation_parameter_store_name="evaluation_parameter_store",
checkpoint_store_name="checkpoint_S3_store",
data_docs_sites={
"s3_site": {
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "data_docs/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
"show_cta_footer": True,
},
}
},
anonymous_usage_statistics={"enabled": True},
)
# Pass the DataContextConfig as a project_config to BaseDataContext
context = BaseDataContext(project_config=data_context_config)
logger.info("Create Checkpoint Config.")
checkpoint_config = {
"name": "my_checkpoint",
"config_version": 1,
"class_name": "Checkpoint",
"run_name_template": "ingest_date=%YYYY-%MM-%DD",
"expectation_suite_name": data_profile_expectation_suite_name,
"runtime_configuration": {
"result_format": {
"result_format": "COMPLETE",
"include_unexpected_rows": True,
}
},
"evaluation_parameters": {},
}
context.add_checkpoint(**checkpoint_config)
# logger.info(f'GE Data Context Config: "{data_context_config}"')
return context
它会根据其文档创建一个错误,我遇到了一个错误,说试图在停止的Spark上下文上运行操作。
有没有更好的方法在Glue3.0中使用火花源? 我希望能够尽可能多地留在Glue3.0上,以防止必须维持两个版本的胶水工作
I'm trying to a validation in the pipeline using Great expectations on AWS glue 3.0.
Here's my initial attempt to create the data context at runtime based on their docs
def create_context():
logger.info("Create DataContext Config.")
data_context_config = DataContextConfig(
config_version=2,
plugins_directory=None,
config_variables_file_path=None,
# concurrency={"enabled": "true"},
datasources={
"my_spark_datasource": DatasourceConfig(
class_name="Datasource",
execution_engine={
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
data_connectors={
"my_spark_dataconnector": {
"module_name": "great_expectations.datasource.data_connector",
"class_name": "RuntimeDataConnector",
"batch_identifiers": [""],
}
},
)
},
stores={
"expectations_S3_store": {
"class_name": "ExpectationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "expectations/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
"validations_S3_store": {
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "validations/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
"evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
"checkpoint_S3_store": {
"class_name": "CheckpointStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"suppress_store_backend_id": "true",
"bucket": data_profile_s3_store_bucket,
"prefix": "checkpoints/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
},
},
expectations_store_name="expectations_S3_store",
validations_store_name="validations_S3_store",
evaluation_parameter_store_name="evaluation_parameter_store",
checkpoint_store_name="checkpoint_S3_store",
data_docs_sites={
"s3_site": {
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": data_profile_s3_store_bucket,
"prefix": "data_docs/",
"s3_put_options": {"ACL": "bucket-owner-full-control"},
},
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
"show_cta_footer": True,
},
}
},
anonymous_usage_statistics={"enabled": True},
)
# Pass the DataContextConfig as a project_config to BaseDataContext
context = BaseDataContext(project_config=data_context_config)
logger.info("Create Checkpoint Config.")
checkpoint_config = {
"name": "my_checkpoint",
"config_version": 1,
"class_name": "Checkpoint",
"run_name_template": "ingest_date=%YYYY-%MM-%DD",
"expectation_suite_name": data_profile_expectation_suite_name,
"runtime_configuration": {
"result_format": {
"result_format": "COMPLETE",
"include_unexpected_rows": True,
}
},
"evaluation_parameters": {},
}
context.add_checkpoint(**checkpoint_config)
# logger.info(f'GE Data Context Config: "{data_context_config}"')
return context
Using this i get an error saying attempting to run operations on stopped spark context.
Is there a better way to use the spark source in glue3.0?
I want to be able to stay on glue3.0 as much as possible to prevent having to maintain two versions of glue jobs
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
You can fix this by setting the
force_reuse_spark_context
到true
,这是一个快速示例(yml):我想添加的另一件事是,您可以在yml文件中定义上下文并将其上传到上传到S3。然后,您可以使用以下功能在胶水作业中解析此文件:
您的CI/CD管道可以在部署到环境时轻松替换YML文件中的存储店(
dev
,) hom
,prod
)。如果您使用的是
RuntimedatAconnector
,则使用GLUE 3.0毫无问题。如果您使用的是CheleRedAssets3DataConnector,并且数据集使用KMS加密,则不会适用。在这种情况下,我只能使用胶水2.0。You can fix this by setting the
force_reuse_spark_context
toTrue
, here is a quick example (YML):Another thing I would like to add is that you can define the context in a YML file and upload it to S3. Then, you can parse this file in the glue job with the function below:
Your CI/CD pipeline can easily replace the store backends in the YML file while deploying it to your environments (
dev
,hom
,prod
).If you are using the
RuntimeDataConnector
, you should have no problem using Glue 3.0. The same does not apply if you are using the InferredAssetS3DataConnector and your datasets are encrypted using KMS. In this case, I was only able to use Glue 2.0.