Delta Mergeschema不使用Memorystream与Spark检查点进行工作

发布于 2025-01-21 06:12:38 字数 3737 浏览 4 评论 0 原文

我正在使用 memoryStream spark测试Deltawriter类,用于创建流(而不是ReadStream),我想在S3上写入s3作为Delta文件,带有选项 “ MergesChema”:true 如下:

import org.apache.spark.sql.execution.streaming.MemoryStream
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
import io.delta.implicits._

case class EnrichedPerson(id: Int, name: String, age: Int, address: String)
case class BasePerson(id: Int, name: String, age: Int)

val data1: List[BasePerson] = List(BasePerson(1, "mark ", 30), BasePerson(2, "paul", 25))
implicit val encoder1: Encoder[BasePerson] = Encoders.product[BasePerson]
val memStream1 = MemoryStream[BasePerson]
memStream1.addData(data1)
val stream1 = memStream1.toDS().toDF()
val streamQuery1 = stream1
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .outputMode("append")
  .option("checkpointLocation", "my/checkpointpoint/location1")
  .delta("s3://bucket/raw/data/..-")
streamQuery1.processAllAvailable()

val data2: List[EnrichedPerson] = List(EnrichedPerson(11, "jhon", 31, "street 1"), EnrichedPerson(22, "luis", 32, "street 2"))
implicit val encoder2: Encoder[EnrichedPerson] = Encoders.product[EnrichedPerson]
val memStream2 = MemoryStream[EnrichedPerson]
val stream2 = memStream2.toDS().toDF()
memStream2.addData(data2)

val streamQuery2 = stream2
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .outputMode("append")
  .option("checkpointLocation", "my/checkpointpoint/location1")
  .delta("s3://bucket/raw/data/..-")

streamQuery2.processAllAvailable()

代码第一次使用某些输入模式(例如col(a),, col(b)< /em>,, col(c))。如果我尝试通过添加一个新列来更改模式(例如 col(a) col(b) col(c) col(d) ),即使我是新列C。使用 MergesChema 启用Delta作为水槽。我没有任何错误,但是 streamquery2 不写任何数据。

根据火花文档,我可以在2个执行之间更改模式,因为我的接收器(Delta)允许架构更改

有条件允许使用不同输出模式的投影变化:SDF.SelectExpr(“ A”)。writestream to sdf.SelectExpr(“ B”)。仅当输出件允许架构从“ A”中更改架构时,才允许Writestream。到“ B”。

通过上述查询第一个执行产生的S3输出是:

deltaTable/
|
|__checkpoint/
|   |__commits/
|   |   |__0
|   |__offsets/
|   |   |__0
|   |__metadata
|
|__delta/
   |__....

检查检查点文件夹的内容,我发现没有关于我的数据模式的元数据的文件。实际上,内容为:

cat Deltatable/checkpoint/metadata

{“ id”:“ B48487CA-5374-4B93-8E26-503184F2ED57”}

CAT Deltatable/checkpoint/consectpoint/consits/0

v1 {“ NextBatchWaterMarkms”:0}

CAT DeltaTable/checkpoint/offsets/0

v1 {"batchWatermarkMs":0,"batchTimestampMs":1649859656284,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark .sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark 。 0

为了解决此问题,如这些 link1 link2 ,足以删除检查点解决方案很强,因为删除了检查点后,如何从相同的偏移开始?

谁能向我解释,为什么删除检查点上一个查询工作,即使对检查点文件夹中没有关于模式的元数据?

提前致谢!

I am testing a DeltaWriter class using MemoryStream by spark for creating a stream (rather than readStream) and i want to write the result on s3 as delta file with option "mergeSchema": true as reported below:

import org.apache.spark.sql.execution.streaming.MemoryStream
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
import io.delta.implicits._

case class EnrichedPerson(id: Int, name: String, age: Int, address: String)
case class BasePerson(id: Int, name: String, age: Int)

val data1: List[BasePerson] = List(BasePerson(1, "mark ", 30), BasePerson(2, "paul", 25))
implicit val encoder1: Encoder[BasePerson] = Encoders.product[BasePerson]
val memStream1 = MemoryStream[BasePerson]
memStream1.addData(data1)
val stream1 = memStream1.toDS().toDF()
val streamQuery1 = stream1
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .outputMode("append")
  .option("checkpointLocation", "my/checkpointpoint/location1")
  .delta("s3://bucket/raw/data/..-")
streamQuery1.processAllAvailable()

val data2: List[EnrichedPerson] = List(EnrichedPerson(11, "jhon", 31, "street 1"), EnrichedPerson(22, "luis", 32, "street 2"))
implicit val encoder2: Encoder[EnrichedPerson] = Encoders.product[EnrichedPerson]
val memStream2 = MemoryStream[EnrichedPerson]
val stream2 = memStream2.toDS().toDF()
memStream2.addData(data2)

val streamQuery2 = stream2
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .outputMode("append")
  .option("checkpointLocation", "my/checkpointpoint/location1")
  .delta("s3://bucket/raw/data/..-")

streamQuery2.processAllAvailable()

The first time the codes works well (streamQuery1) with certain schema of input(e.g. col(A), col(B), , col(C)). If i try to change the schema by adding a new column (e.g. col(A), col(B), col(C), col(D)), the same code (streamQuery2) doesn't update the delta table with the new column C, even though i am using the delta as sink with mergeSchema enabled. I'm not got any error, however the streamQuery2 doesn't write any data.

According the spark docs Recovering from Failures with Checkpointing, i could change the schema between 2 execution, because my sink(delta) allow the schema changes

Changes in projections with different output schema are conditionally allowed: sdf.selectExpr("a").writeStream to sdf.selectExpr("b").writeStream is allowed only if the output sink allows the schema change from "a" to "b".

The output produced to s3 by the first execution from the above query is:

deltaTable/
|
|__checkpoint/
|   |__commits/
|   |   |__0
|   |__offsets/
|   |   |__0
|   |__metadata
|
|__delta/
   |__....

Inspecting the content of the checkpoint folder, i found out that there is no file with metadata regarding the schema of my data. Indeed, the content are:

cat deltaTable/checkpoint/metadata

{"id":"b48487ca-5374-4b93-8e26-503184f2ed57"}

cat deltaTable/checkpoint/commits/0

v1
{"nextBatchWatermarkMs":0}

cat deltaTable/checkpoint/offsets/0

v1
{"batchWatermarkMs":0,"batchTimestampMs":1649859656284,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
0

In order to solve this issue, as reported in these link1 link2, is sufficient to delete the checkpoint however the impact of this solution is strong because after deleted the checkpoint, how can i start from the same offset?

Can anyone explain to me why deleting the checkpoint the previous query work, even though there are no metadata about the schema into the checkpoint folder and how can i use both checkpoint and mergeSchema option in order to realize the schema evolution test?

Thanks in advance!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

凌乱心跳 2025-01-28 06:12:38

表的架构记录在增量日志中,而不是在检查点中。您需要在 _Delta_log 表格下检查JSON文件(例如/user/heve/hive/warehouse/table_name )。

Schema of the table is recorded in the delta log, not in the checkpoint. You need to check JSON files under the _delta_log director of your table (for example /user/hive/warehouse/table_name).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文