spark-structured-streaming

spark-structured-streaming

文章 0 浏览 1

Spark -Java-过滤器流询问查询

我有一个在数据范围中接收数据的Spark应用程序: Dataset df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9…

多孤肩上扛 2025-02-03 09:48:29 3 0

Delta Lake Oss桌由于版本作业而大量

我已经设置了一个Spark独立的群集,并使用Spark结构化流媒体将Kafka的数据写入多个Delta Lake表格 - 简单地存储在文件系统中。因此,每秒有多个写作。…

灯下孤影 2025-02-03 02:26:00 3 0

应用和编写多个转换时读取火花流源一次

我正在尝试使用Spark结构化流以达到以下流程: ┌──────────────────────┐ ┌──►│Transformation DF (1) ├──┐ │ └…

骷髅 2025-02-02 18:27:36 4 0

Azure模式注册表+火花结构化流+ kafka/eventhub兼容性

有没有一种方法可以将Azure模式注册表与Spark结构化流? (不是OpenSource模式注册表)。 理想情况下,我会使用一些高级库,例如Abris( https://gith…

毁梦 2025-02-02 11:22:04 3 0

如何在Pyspark中正确使用foreachBatch()方法?

我正在尝试下沉由Spark中的结构化流动API处理到PostgreSQL的结果。我尝试了以下方法(以某种方式简化了,但希望它清楚): class Processor: def __in…

小…楫夜泊 2025-02-01 13:00:46 4 0

存储在数据框中的结构化流数据

我有以下形式的火花数据框: from pyspark.sql.functions import * from pyspark.sql.types import * schema_sdf_consistent = StructType([ StructF…

银河中√捞星星 2025-02-01 09:52:32 5 0

GCP DataProc-提交工作不起作用时添加多个软件包(Kafka,MongoDB)

我正在尝试添加Kafka& MongoDB软件包在提交DataProc Pyspark作业时,但是失败了。 到目前为止,我一直在使用Kafka软件包,这很好, 但是,当我尝…

孤星 2025-02-01 03:32:11 5 0

火花结构化流 - 更新遵循groupbykey和mapgroupswithstate,给出重复的关键结果

我试图在数据链驱动(scala)中执行以下状态聚合: sig_df .as[InputRow] .groupByKey(_.uid) .mapGroupsWithState(GroupStateTimeout.NoTimeout)(upd…

埋情葬爱 2025-01-31 21:46:10 5 0

将Spark DataFrame写入Kafka作为逗号单独的JSON对象

我无法将dataframe作为逗号分隔的JSON对象发送较大的数据集。 较小数据集的工作代码 df.selectExpr("CAST(collect_list(to_json(struct(*))) AS STRIN…

一抹微笑 2025-01-31 02:43:24 5 0

拾取Pyspark ReadStream正在读取的JSON文件中的更改?

我有JSON文件,每个文件都描述了一个特定实体,包括其状态。我试图通过使用ReadStream和Writestream将它们拉入三角洲。这对于新文件非常有效。这些JSO…

七七 2025-01-30 15:19:04 5 0

火花错误“结果未产生”写信给雪花

我需要帮助检查消息是否实际上写给我的Kafka主题。 我的docker-compose.yml如下所示 version: '3' services: zookeeper: image: bitnami/zookeeper:la…

星軌x 2025-01-30 04:58:29 3 0

在kubernetes上运行的pyspark结构化流上应用机架意识,并从AWS MSK读取

我在以下设置中有一个pyspark结构化流式应用程序: Pyspark -3.0.1版,使用Spark Operator在AWS EKS上运行。 KAFKA-在AWS MSK上运行2.8.1和 replica.s…

终止放荡 2025-01-29 06:41:54 5 0

AnalySisexception:必须使用Writestream.start()执行带有流源的查询。在结构化流上施用Pyspark Puller时Kafka

AnalySisexception:必须使用Writestream.start()执行带有流源的查询。 Kafka 我试图将Pyspark ML纯螺旋阀放在Spark结构化流数据框架上,以将指定列…

十秒萌定你 2025-01-28 17:55:26 4 0

结构化流媒体不会使用input_file_name()获取单个文件名

我有一个结构化的流式作业,在以下目录下读取一堆json.gz文件,并将其写入三角洲表, headFolder |- 00 |-- file1.json.gz |- 01 |-- file2.json.gz .…

调妓 2025-01-26 16:18:01 3 0

有没有一种方法可以使用Spark流进行SignalR数据?

我使用SignalR提供了一个数据源,我从未使用过。 我找不到有关如何用火花流摄取它的任何文档,是否有一个定义的过程? 如果没有,我应该先采取中间步…

夏日落 2025-01-26 16:17:07 4 0
更多

推荐作者

alipaysp_snBf0MSZIv

文章 0 评论 0

梦断已成空

文章 0 评论 0

瞎闹

文章 0 评论 0

寄意

文章 0 评论 0

似梦非梦

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文