胶水状态显示状态成功,但红移视图不令人耳目一新

发布于 2025-01-21 22:18:05 字数 2582 浏览 2 评论 0原文

我正在使用胶水来刷新红移视图和桌子。桌子存储在S3中,我正在拿起这些桌子并用Redhshit写作。 当我提供单个表作为输入(没有任何循环或定义变量)时,它正在工作,但是当我尝试通过S3路径循环时,它不是令人耳目一新的红移。 GLUE UI中的状态显示成功。

以下是代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import logging

logging.basicConfig(format = format)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])


sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

#dictionary containing normalized table path and table name
normalized_table_dictionary={"s3://test/test_data1/":"test_table1","s3://test/test_data1/":"test_table2"}

#@params database,view,user
view_dbname = "test_view"
table_dbname = "test_table"
database = "test_database"
a_user="a_user"
b_user="b_user"


for s3_path,table_name in normalized_table_dictionary.items():
    normalized_table = ''
    redshift_view = ''
    normalized_table = table_name
    redshift_view = "test_"+normalized_table

    inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": [f"{s3_path}"]}, format = "parquet")
    
    pre_query = f"truncate table {table_dbname}.{normalized_table} if exists;"

    logger.info(pre_query)
    
    
    post_query = f"begin;drop materialized view if exists {view_dbname}.{redshift_view};create materialized view {view_dbname}.{redshift_view} as select * from {table_dbname}.{normalized_table};grant select on {view_dbname}.{redshift_view} to {b_user};end;"
    
    logger.info(post_query)
    ## Write data to redshift
    datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = inputGDF, catalog_connection =f"{a_user}", connection_options = {"preactions":pre_query,"dbtable": f"{table_dbname}.{normalized_table}", "database": f"{database}"}, redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink1")
    logger.info('datasink1',datasink1)

    datasink2 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = inputGDF, catalog_connection = f"{a_user}", connection_options = {"dbtable": f"{table_dbname}.{normalized_table}", "database": f"{database}","postactions":post_query}, redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink2")
    
    logger.info('datasink1',datasink2)
job.commit()

I am using glue to refresh redshift views and tables. Tables are stored in S3 and I am picking up those tables and writing in redhshit.
It is working when I provide a single table as input(without any loops or defining variables) but when I am trying to loop in through S3 paths it is not refreshing redshift. The status in glue UI shows succeeded.

Below is the code:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import logging

logging.basicConfig(format = format)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])


sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

#dictionary containing normalized table path and table name
normalized_table_dictionary={"s3://test/test_data1/":"test_table1","s3://test/test_data1/":"test_table2"}

#@params database,view,user
view_dbname = "test_view"
table_dbname = "test_table"
database = "test_database"
a_user="a_user"
b_user="b_user"


for s3_path,table_name in normalized_table_dictionary.items():
    normalized_table = ''
    redshift_view = ''
    normalized_table = table_name
    redshift_view = "test_"+normalized_table

    inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": [f"{s3_path}"]}, format = "parquet")
    
    pre_query = f"truncate table {table_dbname}.{normalized_table} if exists;"

    logger.info(pre_query)
    
    
    post_query = f"begin;drop materialized view if exists {view_dbname}.{redshift_view};create materialized view {view_dbname}.{redshift_view} as select * from {table_dbname}.{normalized_table};grant select on {view_dbname}.{redshift_view} to {b_user};end;"
    
    logger.info(post_query)
    ## Write data to redshift
    datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = inputGDF, catalog_connection =f"{a_user}", connection_options = {"preactions":pre_query,"dbtable": f"{table_dbname}.{normalized_table}", "database": f"{database}"}, redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink1")
    logger.info('datasink1',datasink1)

    datasink2 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = inputGDF, catalog_connection = f"{a_user}", connection_options = {"dbtable": f"{table_dbname}.{normalized_table}", "database": f"{database}","postactions":post_query}, redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink2")
    
    logger.info('datasink1',datasink2)
job.commit()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

涫野音 2025-01-28 22:18:05

好吧,我面临着同样的问题。事实证明,问题是红移。

尝试登录RS并再次执行查询。

well i was facing the same problem. turns out the problem was with redshift.

Try to login into RS and perform your query again.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文