如何使用 pyspark filestream 读取过去一小时内上传的新文件?
我正在尝试读取目录中可用的最新文件(例如过去一小时内的新文件)并加载该数据。我正在尝试使用 pyspark 结构化流媒体。我尝试过 Spark Streaming 的 maxFileAge 选项,但无论选项中指定的时间如何,它仍然会加载目录中的所有文件。
spark.readStream\
.option("maxFileAge", "1h")\
.schema(cust_schema)\
.csv(upload_path) \
.withColumn("closing_date", get_date_udf_func(input_file_name()))\
.writeStream.format('parquet') \
.trigger(once=True) \
.option('checkpointLocation', checkpoint_path) \
.option('path', write_path) \
.start()
上面是我尝试过的代码,但无论时间如何,它都会加载所有可用文件。请指出我在这里做错了什么..
I am trying to read latest files(say new files in last one hour) available in a directory and load that data . I am trying with pyspark structured streaming. i have tried maxFileAge option of spark streaming, but still it is loading all the files in the diretory, regardless of time specified in the option.
spark.readStream\
.option("maxFileAge", "1h")\
.schema(cust_schema)\
.csv(upload_path) \
.withColumn("closing_date", get_date_udf_func(input_file_name()))\
.writeStream.format('parquet') \
.trigger(once=True) \
.option('checkpointLocation', checkpoint_path) \
.option('path', write_path) \
.start()
Above is the code that i tried, but it will load all available files regardless of time . Please point out what i am doing wrong here ..
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
抱歉来得太晚了。
.option("maxFileAge", "1h")
仅在创建 checkpointLocation 之后才有效。示例:
第一次运行,阅读所有内容。
第二次运行,检查 checkpointLocation 是否已创建,然后按 maxFileAge 过滤并将这些文件添加到
checkpointLocation
中。您可以仅读取带有基本路径的 readstream 中的文件来创建单个检查点位置,然后 maxage 参数将在下一批中起作用。
Sorry to being too late.
.option("maxFileAge", "1h")
will only work if after the checkpointLocation was created.Example:
Run first time, read everything.
Run second time, check if checkpointLocation is created then filter by maxFileAge and add these files to
checkpointLocation
.You can read only a file in readstream with basepath to create a single checkpointLocation, then the maxage param will work in the next batches.