Spark 结构化流 - Kinesis 作为数据源

发布于 2025-01-12 03:38:00 字数 810 浏览 4 评论 0原文

我正在尝试使用 psypark 结构化流来使用 kinesis 数据流记录。 我正在尝试在 awsglue 批处理作业中运行此代码。我的目标是使用检查点并将检查点和数据保存到 s3。我能够使用数据,但它只为每个触发器提供很少的记录,而运动数据流有很多记录。我正在使用 TRIM_HORIZON ,它是最早的别名,并触发一次 Spark.writestream ,以便它执行一次并关闭集群。当我再次运行该作业时,它会选择检查点的最新偏移量并运行。

kinesis = spark.readStream.format('kinesis') \
        .option('streamName', kinesis_stream_name) \
        .option('endpointUrl', 'blaablaa')\
        .option('region', region) \
        .option('startingPosition', 'TRIM_HORIZON')\
        .option('maxOffsetsPerTrigger',100000)\
        .load()

// 在这里做一些转换

TargetKinesisData = stream_data.writeStream.format("parquet").outputMode('append').option(
        "path", s3_target).option("checkpointLocation", checkpoint_location).trigger(once=True).start().awaitTermination()

I am trying to consume kinesis data stream records using psypark structured stream.
I am trying to run this code in aws glue batch job. My goal is to use checkpoint and save checkpoints and data to s3. I am able to consume the data but it is giving only few records for every trigger whereas kinesis data stream has lot of records. I am using TRIM_HORIZON which is alias to earliest and trigger spark.writestream once so that it executes once and shuts down the cluster. When i run the job again, it picks latest offset from checkpoint and runs.

kinesis = spark.readStream.format('kinesis') \
        .option('streamName', kinesis_stream_name) \
        .option('endpointUrl', 'blaablaa')\
        .option('region', region) \
        .option('startingPosition', 'TRIM_HORIZON')\
        .option('maxOffsetsPerTrigger',100000)\
        .load()

// do some transformation here

TargetKinesisData = stream_data.writeStream.format("parquet").outputMode('append').option(
        "path", s3_target).option("checkpointLocation", checkpoint_location).trigger(once=True).start().awaitTermination()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

戴着白色围巾的女孩 2025-01-19 03:38:00

简短回答

Spark 检查点位置参数存储有关其成功处理的先前记录的信息。因此,当相同的记录再次出现时,它不会处理它们。如果您更改/删除检查点位置并重新运行作业,它将消耗流中的所有数据。

详细信息

在编写 Spark Streaming 作业时,有两个选项决定重新启动 Streaming 作业时将读取哪些数据。

  1. 起始位置参数:此参数将根据共享的参数(即最早、最晚或基于时间戳)从 AWS Kinesis 获取结果。请阅读以下文档以了解用法。
  2. Spark Stream Checkpoint:此参数存储有关成功处理哪些记录的信息,这有助于提供一致性并避免重新读取旧记录。请阅读此处了解 Spark 流故障宽容

Short Answer

The spark checkpoint location parameter stores information about previous records it processed successfully. So when same records come again it does not process them. If you change/delete the checkpoint location and re-run the job it will consume all the data in your stream.

Details

When writing spark streaming jobs there are two options which decide what data will be read when a streaming job is restarted.

  1. Starting Position Parameter : This parameter will fetch the results from AWS Kinesis based on the parameters shared i.e. earliest, latest or based on timestamp. read the following docs for usage.
  2. Spark Stream Checkpoint: This parameter stores information about which records were successfully processed and this helps to provide consistency and avoid re-reading the old records. Read here about spark streaming fault tolerance
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文