如何使用 pyspark filestream 读取过去一小时内上传的新文件?
我正在尝试读取目录中可用的最新文件(例如过去一小时内的新文件)并加载该数据。我正在尝试使用 pyspark 结构化流媒体。我尝试过 Spark Streaming 的…
Spark numoutputrows 指标显示 -1
我正在使用带有 kafka 输出接收器的结构化流。 我使用 SinkProgess.numoutputrows 指标记录写入 kafka 的记录: https://spark. apache.org/docs/late…
无法访问 RDD foreach 函数内的 scala 值/变量(空)
我有一个 Spark 结构化流作业,需要按照以下代码使用 forEachBatch 函数内的 rdd.forEach: val tableName = "ddb_table" df .writeStream .foreachBa…
Spark 结构化流中是否使用 hadoop 提交者?
我的团队正在使用 Spark 结构化流将消息从 kafka 接收到 HDFS。我们正处于迁移此组件以将消息接收到 AWS S3 的后期阶段,与此相关的是,我们遇到了一…
Spark 结构化流 - Kinesis 作为数据源
我正在尝试使用 psypark 结构化流来使用 kinesis 数据流记录。 我正在尝试在 awsglue 批处理作业中运行此代码。我的目标是使用检查点并将检查点和数据…
如何使用本地 JAR 文件运行 Spark 结构化流
我正在 EKS 上使用 EMR 的 Docker 映像之一 (emr-6.5.0:20211119),并研究如何使用 Spark 结构化编程 (pyspark) 在 Kafka 上工作。根据 集成指南,我…
有没有办法用 Kafka 消费中的最新消息替换旧消息(避免最终 df 中重复)
我正在使用来自某个主题的数据,正如我们所知,我们实时获取数据,其中我们看到重复的元素,如何实际上用最新消息替换旧消息。 我使用以下相同的代码…
提取结构化二进制数据(从 Kafka)并转换为 Integer
我正在尝试使用 Python 从 Kafka 将数据提取到 Spark 中。来自 Kafka 的数据采用 JSON 格式,如下所示: {"order_id": 56, "customer_id": 772, "taxf…
从 kafka 代理到 python 中的 Spark 流应用程序的 json 数据的正确架构是什么
执行此程序后,我在数据帧的所有字段中收到空值。如果架构是问题,请向我建议此 json 格式的正确架构,否则请为我提供从 kafka-topic 读取此 json 数…
结构化流媒体UDF - 基于检查数据框中是否存在列的逻辑
我正在使用 StructuredStreaming,并尝试使用 UDF 来解析每一行。 要求如下: 我们可以获取类型为“主要”或“关键”的特定 KPI 的警报 ,如果我们收…
Trigger.Once Spark 结构化流媒体与 KAFKA 可能吗?
使用 Trigger.Once 的 Spark 结构化流是否允许直接连接到 KAFKA 并使用 MERGE 语句?或者该数据必须来自增量表吗? 这个 https://docs.databricks.com…