将 dbfs 文件作为 databricks 中的流数据帧获取

发布于 2025-01-18 07:05:53 字数 195 浏览 3 评论 0原文

我有一个问题,我需要为每个CSV文件中的数据链球插入一个外部表,该文件将降落到ADLS Gen 2存储中。

当我从dbutils.fs.ls()输出中获取流式数据框时,我考虑了一个解决方案,然后调用一个在foreachBatch()内创建表的函数。

我已经准备好了功能,但是我无法找到一种将目录信息流到流数据框中的方法。有人知道如何实现这一目标吗?

I have a problem where I need to create an external table in Databricks for each CSV file that lands into an ADLS gen 2 storage.

I thought about a solution when I would get a streaming dataframe from dbutils.fs.ls() output and then call a function that creates a table inside the forEachBatch().

I have the function ready, but I can't figure out a way to stream directory information into a streaming Dataframe. Do anyone have an idea on how this could be achieved?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

自找没趣 2025-01-25 07:05:53

请检查以下代码块。

package com.sparkbyexamples.spark.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object SparkStreamingFromDirectory {

  def main(args: Array[String]): Unit = {

    val spark:SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExamples")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val schema = StructType(
      List(
        StructField("Zipcode", IntegerType, true),
        
      )
    )

    val df = spark.readStream
      .schema(schema)
      .json("Your directory")

    df.printSchema()

    val groupDF = df.select("Zipcode")
        .groupBy("Zipcode").count()
    groupDF.printSchema()

    groupDF.writeStream
      .format("console")
      .outputMode("complete")
      .start()
      .awaitTermination()
  }
}

Kindly check with the below code block.

package com.sparkbyexamples.spark.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object SparkStreamingFromDirectory {

  def main(args: Array[String]): Unit = {

    val spark:SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExamples")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val schema = StructType(
      List(
        StructField("Zipcode", IntegerType, true),
        
      )
    )

    val df = spark.readStream
      .schema(schema)
      .json("Your directory")

    df.printSchema()

    val groupDF = df.select("Zipcode")
        .groupBy("Zipcode").count()
    groupDF.printSchema()

    groupDF.writeStream
      .format("console")
      .outputMode("complete")
      .start()
      .awaitTermination()
  }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文