胶水状态显示状态成功,但红移视图不令人耳目一新
我正在使用胶水来刷新红移视图和桌子。桌子存储在S3中,我正在拿起这些桌子并用Redhshit写作。 当我提供单个表作为输入(没有任何循环或定义变量)时,它正在工作,但是当我尝试通过S3路径循环时,它不是令人耳目一新的红移。 GLUE UI中的状态显示成功。
以下是代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import logging
logging.basicConfig(format = format)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
#dictionary containing normalized table path and table name
normalized_table_dictionary={"s3://test/test_data1/":"test_table1","s3://test/test_data1/":"test_table2"}
#@params database,view,user
view_dbname = "test_view"
table_dbname = "test_table"
database = "test_database"
a_user="a_user"
b_user="b_user"
for s3_path,table_name in normalized_table_dictionary.items():
normalized_table = ''
redshift_view = ''
normalized_table = table_name
redshift_view = "test_"+normalized_table
inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": [f"{s3_path}"]}, format = "parquet")
pre_query = f"truncate table {table_dbname}.{normalized_table} if exists;"
logger.info(pre_query)
post_query = f"begin;drop materialized view if exists {view_dbname}.{redshift_view};create materialized view {view_dbname}.{redshift_view} as select * from {table_dbname}.{normalized_table};grant select on {view_dbname}.{redshift_view} to {b_user};end;"
logger.info(post_query)
## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = inputGDF, catalog_connection =f"{a_user}", connection_options = {"preactions":pre_query,"dbtable": f"{table_dbname}.{normalized_table}", "database": f"{database}"}, redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink1")
logger.info('datasink1',datasink1)
datasink2 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = inputGDF, catalog_connection = f"{a_user}", connection_options = {"dbtable": f"{table_dbname}.{normalized_table}", "database": f"{database}","postactions":post_query}, redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink2")
logger.info('datasink1',datasink2)
job.commit()
I am using glue to refresh redshift views and tables. Tables are stored in S3 and I am picking up those tables and writing in redhshit.
It is working when I provide a single table as input(without any loops or defining variables) but when I am trying to loop in through S3 paths it is not refreshing redshift. The status in glue UI shows succeeded.
Below is the code:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import logging
logging.basicConfig(format = format)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
#dictionary containing normalized table path and table name
normalized_table_dictionary={"s3://test/test_data1/":"test_table1","s3://test/test_data1/":"test_table2"}
#@params database,view,user
view_dbname = "test_view"
table_dbname = "test_table"
database = "test_database"
a_user="a_user"
b_user="b_user"
for s3_path,table_name in normalized_table_dictionary.items():
normalized_table = ''
redshift_view = ''
normalized_table = table_name
redshift_view = "test_"+normalized_table
inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": [f"{s3_path}"]}, format = "parquet")
pre_query = f"truncate table {table_dbname}.{normalized_table} if exists;"
logger.info(pre_query)
post_query = f"begin;drop materialized view if exists {view_dbname}.{redshift_view};create materialized view {view_dbname}.{redshift_view} as select * from {table_dbname}.{normalized_table};grant select on {view_dbname}.{redshift_view} to {b_user};end;"
logger.info(post_query)
## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = inputGDF, catalog_connection =f"{a_user}", connection_options = {"preactions":pre_query,"dbtable": f"{table_dbname}.{normalized_table}", "database": f"{database}"}, redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink1")
logger.info('datasink1',datasink1)
datasink2 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = inputGDF, catalog_connection = f"{a_user}", connection_options = {"dbtable": f"{table_dbname}.{normalized_table}", "database": f"{database}","postactions":post_query}, redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink2")
logger.info('datasink1',datasink2)
job.commit()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
好吧,我面临着同样的问题。事实证明,问题是红移。
尝试登录RS并再次执行查询。
well i was facing the same problem. turns out the problem was with redshift.
Try to login into RS and perform your query again.