Spark 结构化流 - Kinesis 作为数据源
我正在尝试使用 psypark 结构化流来使用 kinesis 数据流记录。 我正在尝试在 awsglue 批处理作业中运行此代码。我的目标是使用检查点并将检查点和数据保存到 s3。我能够使用数据,但它只为每个触发器提供很少的记录,而运动数据流有很多记录。我正在使用 TRIM_HORIZON ,它是最早的别名,并触发一次 Spark.writestream ,以便它执行一次并关闭集群。当我再次运行该作业时,它会选择检查点的最新偏移量并运行。
kinesis = spark.readStream.format('kinesis') \
.option('streamName', kinesis_stream_name) \
.option('endpointUrl', 'blaablaa')\
.option('region', region) \
.option('startingPosition', 'TRIM_HORIZON')\
.option('maxOffsetsPerTrigger',100000)\
.load()
// 在这里做一些转换
TargetKinesisData = stream_data.writeStream.format("parquet").outputMode('append').option(
"path", s3_target).option("checkpointLocation", checkpoint_location).trigger(once=True).start().awaitTermination()
I am trying to consume kinesis data stream records using psypark structured stream.
I am trying to run this code in aws glue batch job. My goal is to use checkpoint and save checkpoints and data to s3. I am able to consume the data but it is giving only few records for every trigger whereas kinesis data stream has lot of records. I am using TRIM_HORIZON which is alias to earliest and trigger spark.writestream once so that it executes once and shuts down the cluster. When i run the job again, it picks latest offset from checkpoint and runs.
kinesis = spark.readStream.format('kinesis') \
.option('streamName', kinesis_stream_name) \
.option('endpointUrl', 'blaablaa')\
.option('region', region) \
.option('startingPosition', 'TRIM_HORIZON')\
.option('maxOffsetsPerTrigger',100000)\
.load()
// do some transformation here
TargetKinesisData = stream_data.writeStream.format("parquet").outputMode('append').option(
"path", s3_target).option("checkpointLocation", checkpoint_location).trigger(once=True).start().awaitTermination()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
简短回答
Spark 检查点位置参数存储有关其成功处理的先前记录的信息。因此,当相同的记录再次出现时,它不会处理它们。如果您更改/删除检查点位置并重新运行作业,它将消耗流中的所有数据。
详细信息
在编写 Spark Streaming 作业时,有两个选项决定重新启动 Streaming 作业时将读取哪些数据。
Short Answer
The spark checkpoint location parameter stores information about previous records it processed successfully. So when same records come again it does not process them. If you change/delete the checkpoint location and re-run the job it will consume all the data in your stream.
Details
When writing spark streaming jobs there are two options which decide what data will be read when a streaming job is restarted.