Cosmos Changefeed Spark 流随机停止

发布于 2025-01-10 16:57:15 字数 2117 浏览 0 评论 0原文

我有一个 Spark 流作业,它读取 Cosmos Changefeed 数据,如下所示,在具有 DBR 8.2 的 Databricks 集群中运行。

cosmos_config = {
  "spark.cosmos.accountEndpoint": cosmos_endpoint,
  "spark.cosmos.accountKey": cosmos_key,
  "spark.cosmos.database": cosmos_database,
  "spark.cosmos.container": collection,
  "spark.cosmos.read.partitioning.strategy": "Default",
  "spark.cosmos.read.inferSchema.enabled" : "false",
  "spark.cosmos.changeFeed.startFrom" : "Now",
  "spark.cosmos.changeFeed.mode" : "Incremental"
}
df_ read = (spark.readStream
                 .format("cosmos.oltp.changeFeed")
                 .options(**cosmos_config)
                 .schema(cosmos_schema)
                 .load())
                     
                     
df_write = (df_ read.withColumn("partition_date",current_date())
              .writeStream
              .partitionBy("partition_date")
              .format('delta')
              .option("path", master_path)
              .option("checkpointLocation", f"{master_path}_checkpointLocation")
              .queryName("cosmosStream")
              .trigger(processingTime='10 seconds')
              .start()
            )       

虽然作业通常运行良好,但有时,流会突然停止,并且下面的内容会出现在 log4j 输出中的循环中。重新启动作业会处理“积压”中的所有数据。以前有人经历过这样的事情吗?我不确定是什么原因造成的。有什么想法吗?

22/02/27 00:57:58 INFO HiveMetaStore: 1: get_database: default
22/02/27 00:57:58 INFO audit: ugi=root  ip=unknown-ip-addr  cmd=get_database: default   
22/02/27 00:57:58 INFO DriverCorral: Metastore health check ok
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Starting...
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Start completed.
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Shutdown initiated...
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Shutdown completed.
22/02/27 00:58:07 INFO MetastoreMonitor: Metastore healthcheck successful (connection duration = 88 milliseconds)
22/02/27 00:58:50 INFO RxDocumentClientImpl: Getting database account endpoint from https://<cosmosdb_endpoint>.documents.azure.com:443

I have a Spark streaming job which reads Cosmos Changefeed data as below, running in a Databricks cluster with DBR 8.2.

cosmos_config = {
  "spark.cosmos.accountEndpoint": cosmos_endpoint,
  "spark.cosmos.accountKey": cosmos_key,
  "spark.cosmos.database": cosmos_database,
  "spark.cosmos.container": collection,
  "spark.cosmos.read.partitioning.strategy": "Default",
  "spark.cosmos.read.inferSchema.enabled" : "false",
  "spark.cosmos.changeFeed.startFrom" : "Now",
  "spark.cosmos.changeFeed.mode" : "Incremental"
}
df_ read = (spark.readStream
                 .format("cosmos.oltp.changeFeed")
                 .options(**cosmos_config)
                 .schema(cosmos_schema)
                 .load())
                     
                     
df_write = (df_ read.withColumn("partition_date",current_date())
              .writeStream
              .partitionBy("partition_date")
              .format('delta')
              .option("path", master_path)
              .option("checkpointLocation", f"{master_path}_checkpointLocation")
              .queryName("cosmosStream")
              .trigger(processingTime='10 seconds')
              .start()
            )       

While the job works well ordinarily, occasionally, the streaming stops all of a sudden and the below appears in a loop in the log4j output. Restarting the job processes all the data in the 'backlog'. Has anyone experienced something like this before? I'm not sure what could be causing this. Any ideas?

22/02/27 00:57:58 INFO HiveMetaStore: 1: get_database: default
22/02/27 00:57:58 INFO audit: ugi=root  ip=unknown-ip-addr  cmd=get_database: default   
22/02/27 00:57:58 INFO DriverCorral: Metastore health check ok
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Starting...
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Start completed.
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Shutdown initiated...
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Shutdown completed.
22/02/27 00:58:07 INFO MetastoreMonitor: Metastore healthcheck successful (connection duration = 88 milliseconds)
22/02/27 00:58:50 INFO RxDocumentClientImpl: Getting database account endpoint from https://<cosmosdb_endpoint>.documents.azure.com:443

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

时常饿 2025-01-17 16:57:15

您使用的是哪个版本的 Cosmos Spark 连接器?在 4.3.0 和 4.6.2 之间,批量摄取代码路径中修复了多个错误。

请参阅 https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md 了解更多详细信息。

Which version of the Cosmos Spark connector are you using? Between 4.3.0 and 4.6.2 there were several bug fixes made in the bulk ingestion code path.

See https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md for more details.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文