无法写出湖层的湖泊形成数据

发布于 2025-01-21 19:29:32 字数 3780 浏览 0 评论 0原文

我正在使用湖泊组建造一个POC,在其中阅读了火车移动信息的队列,并使用AWS Data Wrangler将单个事件持续到主管表中。这很好。

然后,我试图用AWS胶水ETL工作阅读此主管表,并将结果数据写入另一个主管表中。这是成功的,并将parquet文件写入该表的基础上的S3桶 /文件夹中,但是当我尝试查询数据时,从雅典娜(Athena)不可读取的数据(Athena查询只是返回记录),

我使用此AWS Wrangler语句创建了旅程表:

aw.catalog.create_parquet_table(database = "train_silver", 
                            table = "journey", 
                            path = "s3://train-silver/journey/",
                            columns_types = {
                                'train_id': 'string',
                                'date': 'date',
                                'stanox': 'string',
                                'start_timestamp': 'timestamp',
                                'created': 'timestamp',
                                'canx_timestamp': 'bigint'
                            },
                            compression = "snappy",
                            partitions_types = {'segment_date': 'date'},
                            table_type = "GOVERNED")

这是胶合作业的代码:

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

logger.info('About to start transaction')

tx_id = glueContext.start_transaction(False)

bronze_table = glueContext.create_dynamic_frame.from_catalog(database = "train_bronze", table_name = "train_movements_governed", 
    additional_options = { "transactionId": tx_id })
logger.info('About to save the bronze table to a view')
bronze_table.toDF().registerTempTable("train_movements")

max_journey_timestamp = 0

journey_df = spark.sql("""
    SELECT train_id, loc_stanox as stanox, CAST(canx_timestamp as bigint) AS canx_timestamp, segment_date
    FROM train_movements
    WHERE canx_type = 'AT ORIGIN'
    AND cast(canx_timestamp AS bigint) > {}""".format(max_journey_timestamp))

journey_df = journey_df.withColumn("created",current_timestamp())

def date_from_timestamp(timestamp_int):
    return datetime.fromtimestamp(int(timestamp_int) / 1000.0).date()
date_UDF = udf(lambda z: date_from_timestamp(z))

def date_time_from_timestamp(timestamp_int):
    return datetime.fromtimestamp(int(timestamp_int) / 1000.0)
date_time_UDF = udf(lambda z: date_from_timestamp(z))

journey_df = journey_df.withColumn("date", date_UDF(col("canx_timestamp")))
journey_df = journey_df.withColumn("start_timestamp", date_time_UDF(col("canx_timestamp")))
journey_df.printSchema()

try:
    save_journey_frame = DynamicFrame.fromDF(journey_df, glueContext, "journey_df")
    logger.info('Saving ' + str(save_journey_frame.count()) + 'new journeys')
    journeySink = glueContext.write_dynamic_frame.from_catalog(frame = save_journey_frame, database = "train_silver", table_name = "journey", 
        additional_options = { "callDeleteObjectsOnCancel": True, "transactionId": tx_id })
    logger.info('Committing transaction')
    glueContext.commit_transaction(tx_id)
    logger.info('Transaction committed')
except Exception:
    glueContext.cancel_transaction(tx_id)
    raise
logger.info('Committing the job')
job.commit()

运行胶作业时,表文件夹中有parquet文件,但在我的表定义定义的分区文件夹中没有组织它们:

我还尝试编写一个胶合作业,该胶作业读取该文件夹中的parquet文件,它们包含应有的所有行。

这是我试图查询雅典娜数据的屏幕截图:

我在这里缺少什么,如何将数据从Spark Glue Job中添加到管制表中,以便我可以从Athena查询它?

I am building a POC with Lake Formation where I read a queue of train movement information and persist the individual events into a governed table using AWS data wrangler. This works fine.

Then I am trying to read this governed table with an AWS Glue ETL job, and write the resulting data into another governed table. This succeeds, and writes parquet files into the S3 bucket / folder underlying that table, but when I try and query the data it's not readable from Athena (an Athena query just returns no records)

I created the journey table using this Aws Wrangler statement:

aw.catalog.create_parquet_table(database = "train_silver", 
                            table = "journey", 
                            path = "s3://train-silver/journey/",
                            columns_types = {
                                'train_id': 'string',
                                'date': 'date',
                                'stanox': 'string',
                                'start_timestamp': 'timestamp',
                                'created': 'timestamp',
                                'canx_timestamp': 'bigint'
                            },
                            compression = "snappy",
                            partitions_types = {'segment_date': 'date'},
                            table_type = "GOVERNED")

Here's the code for the Glue job:

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

logger.info('About to start transaction')

tx_id = glueContext.start_transaction(False)

bronze_table = glueContext.create_dynamic_frame.from_catalog(database = "train_bronze", table_name = "train_movements_governed", 
    additional_options = { "transactionId": tx_id })
logger.info('About to save the bronze table to a view')
bronze_table.toDF().registerTempTable("train_movements")

max_journey_timestamp = 0

journey_df = spark.sql("""
    SELECT train_id, loc_stanox as stanox, CAST(canx_timestamp as bigint) AS canx_timestamp, segment_date
    FROM train_movements
    WHERE canx_type = 'AT ORIGIN'
    AND cast(canx_timestamp AS bigint) > {}""".format(max_journey_timestamp))

journey_df = journey_df.withColumn("created",current_timestamp())

def date_from_timestamp(timestamp_int):
    return datetime.fromtimestamp(int(timestamp_int) / 1000.0).date()
date_UDF = udf(lambda z: date_from_timestamp(z))

def date_time_from_timestamp(timestamp_int):
    return datetime.fromtimestamp(int(timestamp_int) / 1000.0)
date_time_UDF = udf(lambda z: date_from_timestamp(z))

journey_df = journey_df.withColumn("date", date_UDF(col("canx_timestamp")))
journey_df = journey_df.withColumn("start_timestamp", date_time_UDF(col("canx_timestamp")))
journey_df.printSchema()

try:
    save_journey_frame = DynamicFrame.fromDF(journey_df, glueContext, "journey_df")
    logger.info('Saving ' + str(save_journey_frame.count()) + 'new journeys')
    journeySink = glueContext.write_dynamic_frame.from_catalog(frame = save_journey_frame, database = "train_silver", table_name = "journey", 
        additional_options = { "callDeleteObjectsOnCancel": True, "transactionId": tx_id })
    logger.info('Committing transaction')
    glueContext.commit_transaction(tx_id)
    logger.info('Transaction committed')
except Exception:
    glueContext.cancel_transaction(tx_id)
    raise
logger.info('Committing the job')
job.commit()

When the Glue job is run, there are parquet files in the table folder, but they aren't organized in the partition folders defined by my table definition:
Screenshot of Journey folder with parquet files but no partition folder

I also tried writing a glue job that reads the parquet files in that folder, they contain all the rows that they should.

Here's a screenshot of me trying to query the data in Athena:
SELECT query in Athena scanning no data, returning no rows

What am I missing here, how do I get the data added to the governed table from a Spark glue job so I can query it from Athena?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

你在看孤独的风景 2025-01-28 19:29:32

我认为问题在于表上的对象没有被更新。

您可以检查使用此AWS CLI命令:

aws lakeformation get-table-objects --database-name train_silver --table-name journey

eTL输入和输出的格式选项AWS胶水文档

对于编写Apache Parquet,AWS GLUE ETL仅支持写作
通过为自定义镶木编指定选项来控制表
针对动态帧进行了优化的类型。写信给受管桌
使用镶木quet格式,您应该添加键
在表参数中具有真实的值。

您可以将表的分类参数设置为“ GlueParquet”时(也可以更新此):

aw.catalog.create_parquet_table(database = "train_silver", 
                        table = "journey", 
                        path = "s3://train-silver/journey/",
                        columns_types = {
                            'train_id': 'string',
                            'date': 'date',
                            'stanox': 'string',
                            'start_timestamp': 'timestamp',
                            'created': 'timestamp',
                            'canx_timestamp': 'bigint'
                        },
                        compression = "snappy",
                        parameters={
                            "classification": "glueparquet"
                        }
                        partitions_types = {'segment_date': 'date'},
                        table_type = "GOVERNED")

I think the problem is that the objects on the table are not being updated.

You can check that using this AWS CLI command:

aws lakeformation get-table-objects --database-name train_silver --table-name journey

From the Format Options for ETL Inputs and Outputs in AWS Glue documentation

For writing Apache Parquet, AWS Glue ETL only supports writing to a
governed table by specifying an option for a custom Parquet writer
type optimized for Dynamic Frames. When writing to a governed table
with the parquet format, you should add the key useGlueParquetWriter
with a value of true in the table parameters.

You can alternatively set the classification parameter of your table to "glueparquet" when you create it (you can also update this):

aw.catalog.create_parquet_table(database = "train_silver", 
                        table = "journey", 
                        path = "s3://train-silver/journey/",
                        columns_types = {
                            'train_id': 'string',
                            'date': 'date',
                            'stanox': 'string',
                            'start_timestamp': 'timestamp',
                            'created': 'timestamp',
                            'canx_timestamp': 'bigint'
                        },
                        compression = "snappy",
                        parameters={
                            "classification": "glueparquet"
                        }
                        partitions_types = {'segment_date': 'date'},
                        table_type = "GOVERNED")
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文