Cosmos Changefeed Spark 流随机停止
我有一个 Spark 流作业,它读取 Cosmos Changefeed 数据,如下所示,在具有 DBR 8.2 的 Databricks 集群中运行。
cosmos_config = {
"spark.cosmos.accountEndpoint": cosmos_endpoint,
"spark.cosmos.accountKey": cosmos_key,
"spark.cosmos.database": cosmos_database,
"spark.cosmos.container": collection,
"spark.cosmos.read.partitioning.strategy": "Default",
"spark.cosmos.read.inferSchema.enabled" : "false",
"spark.cosmos.changeFeed.startFrom" : "Now",
"spark.cosmos.changeFeed.mode" : "Incremental"
}
df_ read = (spark.readStream
.format("cosmos.oltp.changeFeed")
.options(**cosmos_config)
.schema(cosmos_schema)
.load())
df_write = (df_ read.withColumn("partition_date",current_date())
.writeStream
.partitionBy("partition_date")
.format('delta')
.option("path", master_path)
.option("checkpointLocation", f"{master_path}_checkpointLocation")
.queryName("cosmosStream")
.trigger(processingTime='10 seconds')
.start()
)
虽然作业通常运行良好,但有时,流会突然停止,并且下面的内容会出现在 log4j 输出中的循环中。重新启动作业会处理“积压”中的所有数据。以前有人经历过这样的事情吗?我不确定是什么原因造成的。有什么想法吗?
22/02/27 00:57:58 INFO HiveMetaStore: 1: get_database: default
22/02/27 00:57:58 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_database: default
22/02/27 00:57:58 INFO DriverCorral: Metastore health check ok
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Starting...
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Start completed.
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Shutdown initiated...
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Shutdown completed.
22/02/27 00:58:07 INFO MetastoreMonitor: Metastore healthcheck successful (connection duration = 88 milliseconds)
22/02/27 00:58:50 INFO RxDocumentClientImpl: Getting database account endpoint from https://<cosmosdb_endpoint>.documents.azure.com:443
I have a Spark streaming job which reads Cosmos Changefeed data as below, running in a Databricks cluster with DBR 8.2.
cosmos_config = {
"spark.cosmos.accountEndpoint": cosmos_endpoint,
"spark.cosmos.accountKey": cosmos_key,
"spark.cosmos.database": cosmos_database,
"spark.cosmos.container": collection,
"spark.cosmos.read.partitioning.strategy": "Default",
"spark.cosmos.read.inferSchema.enabled" : "false",
"spark.cosmos.changeFeed.startFrom" : "Now",
"spark.cosmos.changeFeed.mode" : "Incremental"
}
df_ read = (spark.readStream
.format("cosmos.oltp.changeFeed")
.options(**cosmos_config)
.schema(cosmos_schema)
.load())
df_write = (df_ read.withColumn("partition_date",current_date())
.writeStream
.partitionBy("partition_date")
.format('delta')
.option("path", master_path)
.option("checkpointLocation", f"{master_path}_checkpointLocation")
.queryName("cosmosStream")
.trigger(processingTime='10 seconds')
.start()
)
While the job works well ordinarily, occasionally, the streaming stops all of a sudden and the below appears in a loop in the log4j output. Restarting the job processes all the data in the 'backlog'. Has anyone experienced something like this before? I'm not sure what could be causing this. Any ideas?
22/02/27 00:57:58 INFO HiveMetaStore: 1: get_database: default
22/02/27 00:57:58 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_database: default
22/02/27 00:57:58 INFO DriverCorral: Metastore health check ok
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Starting...
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Start completed.
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Shutdown initiated...
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Shutdown completed.
22/02/27 00:58:07 INFO MetastoreMonitor: Metastore healthcheck successful (connection duration = 88 milliseconds)
22/02/27 00:58:50 INFO RxDocumentClientImpl: Getting database account endpoint from https://<cosmosdb_endpoint>.documents.azure.com:443
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您使用的是哪个版本的 Cosmos Spark 连接器?在 4.3.0 和 4.6.2 之间,批量摄取代码路径中修复了多个错误。
请参阅 https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md 了解更多详细信息。
Which version of the Cosmos Spark connector are you using? Between 4.3.0 and 4.6.2 there were several bug fixes made in the bulk ingestion code path.
See https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md for more details.