AWS胶 - 从时间戳字段创建日期分区

发布于 2025-02-02 20:03:09 字数 2178 浏览 3 评论 0 原文

字段

时间戳
具有 具有 数据

​代码>应用绘制将数据保存到新的S3位置。目前,我已经添加了 id 版本通过在Visual编辑器中选择这些字段,并使用以下结构保存我的数据: id = 1/version = 2/我想解析时间戳并提取日期值,以便文件系统结构为 id = 1/version = 2/dt = 2022-01-01-01/。但是,在视觉编辑器中,我只能选择时间戳,并且无法在现场执行任何操纵。我猜我需要更改代码,但是我不确定如何。

代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={"paths": ["s3://my-data"], "recurse": True},
    transformation_ctx="S3bucket_node1",
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("timestamp", "timestamp", "timestamp", "timestamp"),
        ("id", "string", "id", "string"),
        ("version", "string", "version", "string"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=ApplyMapping_node2,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://target-data",
        "partitionKeys": ["id", "version"],
    },
    format_options={"compression": "gzip"},
    transformation_ctx="S3bucket_node3",
)

job.commit()

Having a data frame with a timestamp field, like so:

timestamp id version
2022-01-01 01:02:00.000 1 2
2022-01-01 05:12:00.000 1 2

I've created a Glue job that is using ApplyMapping to save the data to a new S3 location. Currently I've added id and version partition by selecting those fields in the visual editor and my data is saved with the following structure: id=1/version=2/ I would like to parse the timestamp and extract the date value so the filesystem structure would be id=1/version=2/dt=2022-01-01/. However, in the visual editor I can only select the timestamp and cant perform any manipulation on the field. I'm guessing I need to change the code, but I'm not sure how.

Code:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={"paths": ["s3://my-data"], "recurse": True},
    transformation_ctx="S3bucket_node1",
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("timestamp", "timestamp", "timestamp", "timestamp"),
        ("id", "string", "id", "string"),
        ("version", "string", "version", "string"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=ApplyMapping_node2,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://target-data",
        "partitionKeys": ["id", "version"],
    },
    format_options={"compression": "gzip"},
    transformation_ctx="S3bucket_node3",
)

job.commit()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

分開簡單 2025-02-09 20:03:09

使用

将此方法添加到您的脚本中,

def AddDate(rec):
    ts = str(rec["timestamp"])
    rec["dt"] = ts[:10]
    return rec

ApplyMapping 步骤之后插入地图变换。

Mapped_dyF = Map.apply(frame = ApplyMapping_node2, f = AddDate)

将写入更新为S3步骤,请注意更改为 frame PartitionKeys

S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=Mapped_dyF,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://target-data",
        "partitionKeys": ["id", "version", "dt"],
    },
    format_options={"compression": "gzip"},
    transformation_ctx="S3bucket_node3",
)

Use the Map Class.

Add this method to your script

def AddDate(rec):
    ts = str(rec["timestamp"])
    rec["dt"] = ts[:10]
    return rec

Insert the Map Transform after the ApplyMapping step.

Mapped_dyF = Map.apply(frame = ApplyMapping_node2, f = AddDate)

Update the write to S3 step, notice the change to frame and partitionKeys.

S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=Mapped_dyF,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://target-data",
        "partitionKeys": ["id", "version", "dt"],
    },
    format_options={"compression": "gzip"},
    transformation_ctx="S3bucket_node3",
)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文