结构化流查询失败,并显示“找不到事务日志中引用的文件。”
我正在从Delta表源流式传输,而我的查询不断失败,而无法找到交易日志中引用的文件。怪异的部分是,当我运行 fsck修复表table table table table dry …
Spark Spark-sql-kafka - java.lang.NoClassDefFoundError:org/apache/kafka/common/serialization/ByteArraySerializer
我正在尝试通过。 火花版:3.2.1 Scala版本:2.12.15 遵循其火花壳的指南,包括依赖项,我开始了我的外壳: spark-shell --packages org.apache.spark…
将 dbfs 文件作为 databricks 中的流数据帧获取
我有一个问题,我需要为每个CSV文件中的数据链球插入一个外部表,该文件将降落到ADLS Gen 2存储中。 当我从dbutils.fs.ls()输出中获取流式数据框时…
如何使用Spark结构化流(Python)生成一个小时长的镶木式文件?
我想每小时生成镶木地板文件,其中包含该小时内收到的所有信息,以便使用 Spark NLP 进行进一步处理。 我有来自 Kafka 的流数据,当我 writeStream 时…
更改Parquet文件列值而不更改Parquet文件的名称,以免更改_spark_metadata
因此,我有一个火花结构化的流媒体作业,该作业将在日期分区的文件夹中以Parquet格式存储在HDFS位置中的JSON消息数据范围,即/源/无线/active_portal/…
Pyspark Kafka ReadStream
我正在使用以下代码读取KAFKA主题的数据。 os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1…
Spark结构化流式处理pyspark,applyInPandas被立即调用并且不等待窗口过期
Spark 结构化流不允许窗口函数执行滞后、超前操作。所以我尝试使用 applyInpandas 函数。 我有一个 5 分钟的翻滚窗口,水印在附加模式下设置为 1 分钟…
Spark 结构化流 qubole Kinesis 连接器错误并显示“获取凭据时出现异常”
我使用以下代码从 Spark 结构化流代码写入 Kinesis。它因以下错误而出错。 AWS 凭证具有管理员访问权限。我可以使用 aws 控制台。这里可能有什么问题…
Spark Streaming 将数据写入 Kafka 主题
我正在尝试为每个 RDD 向 Kafka 主题写入一个数据帧。 我正在使用下面的代码: mesg.foreachRDD(rdd => { Dataframe.write.format("kafka") .option("…
在 Spark 结构化流中解释数组 JSON 中的数据帧
我的数据框中有一列下面的 json 字符串,如何分解/压平它以获得单级数据框? 目前的模式是 df |-json_data (StringType) 如何拥有以下模式的 df ? df…
在 Spark 结构化流中计算滑动窗口中的多个聚合
我有一个流源,它发送事件,其中每条记录由 3 个字段组成(CreationTime、FP、Detected) 这里,“FP”代表误报。 “FP”和“检测到”字段的值可以为 …
StructuredStreaming - 根据 Kafka 主题中的新事件处理数据
我有一个结构化的Streaming程序,它从Kafka主题A读取数据,并进行一些处理,最后将数据放入目标Kafka主题。 笔记 : 处理是在函数 -convertToDictForE…
使用 python 中的 Spark 结构化流从来自 kafka 的 json 创建数据框
我对 Spark 的结构化流还很陌生,正在开发一个需要在结构化流上实现的 poc。 输入源:卡夫卡 输入格式:json 语言: python3 库:spark 3.2 我正在尝试…
使用 Trigger.AvailableNow 进行有状态流操作
据我了解,在流模式下运行流作业时,状态会在窗口+水印期间保持,然后在无法再修改时写入。当我们使用 Trigger.AvailableNow 时,这会如何转化?它是…