如何将KAFKA的单个主题流式传输,通过键过滤到HDF的多个位置?

发布于 2025-02-05 07:43:14 字数 1958 浏览 4 评论 0原文

我不是要在多个HDFS位置上流式传输数据,该位置通过键过滤。因此,下面的代码不起作用。请帮助我找到编写此代码的正确方法

    val ER_stream_V1 = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", configManager.getString("Kafka.Server"))
        .option("subscribe", "Topic1")
        .option("startingOffsets", "latest")
        .option("failOnDataLoss", "false")
        .load()
val ER_stream_V2 = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", configManager.getString("Kafka.Server"))
.option("subscribe", "Topic1")
        .option("startingOffsets", "latest")
        .option("failOnDataLoss", "false")
        .load()

        ER_stream_V1.toDF()
        .select(col("key"), col("value").cast("string"))
        .filter(col("key")==="Value1")
        .select(functions.from_json(col("value").cast("string"), Value1Schema.schemaExecution).as("value")).select("value.*")
        .writeStream
        .format("orc")
        .option("metastoreUri", configManager.getString("spark.datasource.hive.warehouse.metastoreUri"))
        .option("checkpointLocation", "/tmp/teststreaming/execution/checkpoint2005")
        .option("path", "/tmp/test/value1")
        .trigger(Trigger.ProcessingTime("5 Seconds"))
        .partitionBy("jobid")
        .start()

        ER_stream_V2.toDF()
        .select(col("key"), col("value").cast("string"))
        .filter(col("key")==="Value2")
        .select(functions.from_json(col("value").cast("string"), Value2Schema.schemaJobParameters).as("value"))
        .select("value.*")
        .writeStream
        .format("orc")
        .option("metastoreUri", configManager.getString("spark.datasource.hive.warehouse.metastoreUri"))
        .option("checkpointLocation", "/tmp/teststreaming/jobparameters/checkpoint2006")
        .option("path", "/tmp/test/value2")
        .trigger(Trigger.ProcessingTime("5 Seconds"))
        .partitionBy("jobid")
        .start()

I am not being to stream my data on multiple hdfs location , which is filtered by key. So below code is not working. Please help me to find the correct way to write this code

    val ER_stream_V1 = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", configManager.getString("Kafka.Server"))
        .option("subscribe", "Topic1")
        .option("startingOffsets", "latest")
        .option("failOnDataLoss", "false")
        .load()
val ER_stream_V2 = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", configManager.getString("Kafka.Server"))
.option("subscribe", "Topic1")
        .option("startingOffsets", "latest")
        .option("failOnDataLoss", "false")
        .load()

        ER_stream_V1.toDF()
        .select(col("key"), col("value").cast("string"))
        .filter(col("key")==="Value1")
        .select(functions.from_json(col("value").cast("string"), Value1Schema.schemaExecution).as("value")).select("value.*")
        .writeStream
        .format("orc")
        .option("metastoreUri", configManager.getString("spark.datasource.hive.warehouse.metastoreUri"))
        .option("checkpointLocation", "/tmp/teststreaming/execution/checkpoint2005")
        .option("path", "/tmp/test/value1")
        .trigger(Trigger.ProcessingTime("5 Seconds"))
        .partitionBy("jobid")
        .start()

        ER_stream_V2.toDF()
        .select(col("key"), col("value").cast("string"))
        .filter(col("key")==="Value2")
        .select(functions.from_json(col("value").cast("string"), Value2Schema.schemaJobParameters).as("value"))
        .select("value.*")
        .writeStream
        .format("orc")
        .option("metastoreUri", configManager.getString("spark.datasource.hive.warehouse.metastoreUri"))
        .option("checkpointLocation", "/tmp/teststreaming/jobparameters/checkpoint2006")
        .option("path", "/tmp/test/value2")
        .trigger(Trigger.ProcessingTime("5 Seconds"))
        .partitionBy("jobid")
        .start()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

青春有你 2025-02-12 07:43:14

您不需要两个读者。创建一个并过滤两次。您可能还需要考虑startoffsets作为最早的以读取现有主题数据

val ER_stream = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", configManager.getString("Kafka.Server"))
    .option("subscribe", "Topic1")
    .option("startingOffsets", "latest")  // maybe change?
    .option("failOnDataLoss", "false")
    .load()
    .toDF()
    .select(col("key").cast("string").as("key"), col("value").cast("string"))

val value1Stream = ER_stream
    .filter(col("key") === "Value1")
    .select(functions.from_json(col("value"), Value1Schema.schemaExecution).as("value"))
    .select("value.*")

val value2Stream = ER_stream
    .filter(col("key") === "Value2")
    .select(functions.from_json(col("value"), Value2Schema.schemaJobParameters).as("value"))
    .select("value.*")

value1Stream.writeStream.format("orc")
    ...
    .start()

value2Stream.writeStream.format("orc")
    ...
    .start()

You should not need two readers. Create one and filter twice. You might also want to consider startingOffsets as earliest to read existing topic data

For example.

val ER_stream = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", configManager.getString("Kafka.Server"))
    .option("subscribe", "Topic1")
    .option("startingOffsets", "latest")  // maybe change?
    .option("failOnDataLoss", "false")
    .load()
    .toDF()
    .select(col("key").cast("string").as("key"), col("value").cast("string"))

val value1Stream = ER_stream
    .filter(col("key") === "Value1")
    .select(functions.from_json(col("value"), Value1Schema.schemaExecution).as("value"))
    .select("value.*")

val value2Stream = ER_stream
    .filter(col("key") === "Value2")
    .select(functions.from_json(col("value"), Value2Schema.schemaJobParameters).as("value"))
    .select("value.*")

value1Stream.writeStream.format("orc")
    ...
    .start()

value2Stream.writeStream.format("orc")
    ...
    .start()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文