Spark.sql.Adaptive.Semabled Work Spark结构化流?
我使用Apache Spark结构化流。结构化流是建立在Spark SQL发动机上的可扩展且耐故障的流处理引擎。由于它建立在Spark SQL引擎上,这是否表示Spark.SQL.…
错误读取增量文件,使用kafka的状态火花结构化流式作业
我是通过Spark Window操作通过Kafka读取数据的,并结合了Kafka和Hive Data,可以制作单个DataFrame。我正在尝试通过SPARK窗口功能和Row_number找到带…
火花结构化流 - 检查点元数据无限期地生长
我使用Spark Struture流媒体3.1.2。我需要使用S3来存储检查点元数据(我知道,它不是检查点元数据的最佳存储)。压实间隔为10(默认),我设置 spark.…
火花结构化流驱动器中的内存泄漏
我正在使用Hadoop 3.2.0使用Spark 3.1.2来运行Spark结构化流( sss )聚合作业,在Spark K8上运行。 这些作业是使用提供的SSS读取文件源输入的SSS的文…
Prometheus的Pyspark UDF监视
我正在尝试使用计数器在UDF中监视某些逻辑。 即 counter = Counter(...).labels("value") @ufd def do_smthng(col): if col: counter.label("not_null…
我需要有多个事件中心消费组的多个水槽?
我正在从EventHub接收流数据,并且有4种来自EventHub的数据。 我正在数据集中群上的Event Hub读取数据,如: ehConf = {} ehConf['eventhubs.connecti…
火花结构化的流媒体批处理以将数据写入安装的斑点存储容器
我正在接收流数据,并想将我的数据从Spark Databricks群集写入Azure Blob存储容器。 为此,我已经安装了存储帐户,我将路径指定到我的流式水槽查询中…
火花结构化的流式写作触发设置设置为一次录制的数据要比应有的要少得多
我有一个每小时运行的程序,它会收到流数据,并以镶木制格式以批量格式写入数据卡片中,每次运行时,以稍后处理另一个功能处理。 因此,我将Writestre…
Spark流与静态数据链球三分表的可靠性如何可靠
在databricks 有一个很酷的功能使用Delta表加入流数据框。很酷的部分是,对于随后的联接结果,增量表中的更改仍会反映出来。它可以正常工作,但是我很…
如果未来事件延迟,如何确保外部零连接结果输出在火花流中输出
在Spark流式流式外部连接的场景中: val left = spark.readStream.format("delta").load("...") .withWatermark("enqueuedTime", "1 hour") val right…
是独特的'不支持结构化流数据集的操作?
从火花结构化的流媒体文档中,包含它 不支持流媒体数据集的不同操作。 但是,API中有一个 distract()方法,我也可以在流dateset之后调用 distract(…
Delta Mergeschema不使用Memorystream与Spark检查点进行工作
我正在使用 memoryStream spark测试Deltawriter类,用于创建流(而不是ReadStream),我想在S3上写入s3作为Delta文件,带有选项 “ MergesChema”:tr…