我有一个结构化的流式作业,在以下目录下读取一堆json.gz文件,并将其写入三角洲表,
headFolder
|- 00
|-- file1.json.gz
|- 01
|-- file2.json.gz
...
|- 23
|-- file24.json.gz
我正在运行的结构化流如下所示,
spark.readStream
.format('cloudFiles')
.options({"cloudFiles.format": "json", "cloudFiles.schemaEvolutionMode": "rescue"})
.schema(schema_predefined)
.load("./headFolder/")
.withColumn("input_file_path", input_file_name())
.writeStream
.format("delta")
.outputMode("append")
.options({'checkpointLocation': checkpoint_path, 'path': output_path})
.trigger({'once': True})
.queryName("query_name")
.start()
我在上面的查询中省略了一些详细信息,请获取所有未确定的参数如预定义。运行工作后,处理了所有24个文件,我可以验证数据正确。但是,input_file_name()函数无法正如我期望的那样起作用。
当我检查 input_file_name
列时,我期望有24个不同的记录,因为它们的关键名称不同。但是,我只看到大约5个文件名根据文件大小而变化。我查看文档后确实,它返回任务的文件名而不是单个文件,因此,由于我从顶级阅读,Spark自动将24小时的时间分为几个任务,并从读取的文件中选择一个名称。
我的问题是,是否仍有一种方法可以准确记录当前框架下处理的文件的文件名?由于运行时原因,我不想更改文件路径或强迫每个文件执行一个任务。
谢谢你!
I have a structured streaming job that reads in a bunch of json.gz files under the following directory and writes to a delta table
headFolder
|- 00
|-- file1.json.gz
|- 01
|-- file2.json.gz
...
|- 23
|-- file24.json.gz
The structured streaming I'm running is as follows
spark.readStream
.format('cloudFiles')
.options({"cloudFiles.format": "json", "cloudFiles.schemaEvolutionMode": "rescue"})
.schema(schema_predefined)
.load("./headFolder/")
.withColumn("input_file_path", input_file_name())
.writeStream
.format("delta")
.outputMode("append")
.options({'checkpointLocation': checkpoint_path, 'path': output_path})
.trigger({'once': True})
.queryName("query_name")
.start()
I omitted some details in the query above, please take all undeclared parameters as pre-defined. After I run the job, all the 24 files were processed and I can validate that data was correct. However, the input_file_name() function didn't work as I was expecting.
When I check the input_file_name
column, I was expecting 24 distinct records since their key names are different. However, I see only around 5 filenames, which varies based on file size. After I looked into the documentation here, indeed it returns the file name of the TASK instead of the individual files, thus since I'm reading from the top level, Spark automatically divides the 24 hours into several tasks and picked one name from the files read.
My question is, is there still a way to accurately record the filename for the file processed under the current framework? I don't want to change the file path or force it to run one task per file for runtime reasons.
Thank you!
发布评论