是独特的'不支持结构化流数据集的操作?

发布于 2025-01-22 00:52:17 字数 4430 浏览 2 评论 0 原文

从火花结构化的流媒体文档中,包含它

不支持流媒体数据集的不同操作。

但是,API中有一个 distract()方法,我也可以在流dateset之后调用 distract()

  public final class JavaStructuredNetworkWordDistinct {

  public static void main(String[] args) throws Exception {
    System.setProperty("hadoop.home.dir","C://hadoop" );
    SparkSession spark = SparkSession
            .builder()
            .appName("JavaStructuredNetworkWordDistinct")
            .config("spark.master", "local[*]")
            .getOrCreate();
    spark.sparkContext().setLogLevel("ERROR");
    spark.conf().set("spark.sql.shuffle.partitions",4);
    // Read all the csv files written atomically in a directory
    StructType userSchema = new StructType().add("event_time", "string").add("id", "string");
    Dataset<Tuple2<Timestamp, String>> dropStream = spark
            .readStream()
            .option("sep", ",")
            .schema(userSchema)      // Specify schema of the csv files
            .csv("D:\\deduplication")
            .selectExpr("to_timestamp(event_time,'yyyy-MM-dd HH:mm:ss') as event_time","id as id")
            .as(Encoders.tuple(Encoders.TIMESTAMP(), Encoders.STRING()));

    StreamingQuery outerQuery =  execDeduplicationDistinct(spark,dropStream);
    outerQuery.awaitTermination();
  }


  private static StreamingQuery execDeduplicationDistinct(SparkSession spark, Dataset<Tuple2<Timestamp, String>> dropStream) {

    Dataset<Tuple2<Timestamp, String>> dropDuplicatesStream = dropStream.distinct();
    // Start running the query that prints the running counts to the console
    StreamingQuery query = dropDuplicatesStream.writeStream()
            .outputMode("append")
            .format("console")
            .start();
    return query;
  }
}

文件夹 d:\\ reduplication 下方只有一个文件,内容

event_time,word
2022-04-10 11:44:00,word1
2022-04-10 11:45:00,word2
2022-04-10 11:45:00,word2
2022-04-10 11:45:00,word2

最终

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+-----+
|         event_time|   id|
+-------------------+-----+
|2022-04-10 11:44:00|word1|
|2022-04-10 11:45:00|word2|
|               null| word|
+-------------------+-----+

是这样表明的吗?当我理解不同的时,怎么了?

而且,我还运行插座流。 代码是

object StructuredNetworkWordCountDistinct {
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    val spark: SparkSession = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .master("local[*]")
      .config("spark.sql.shuffle.partitions",4)
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    import spark.implicits._

    // Create DataFrame representing the stream of input lines from connection to host:port
    val lines: DataFrame = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()
    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))
    // Generate running word count
    val wordCounts = words.distinct()
    // Start running the query that prints the running counts to the console
    val query = wordCounts.writeStream
      .trigger(Trigger.ProcessingTime("1 second"))  // only change in query
      .outputMode("append")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

并使用 nc -l -L -P 9999 启动NetCat。 首先,输入 v1 ,所有输出批处理结果是

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
|   v1|
+-----+

,其次,输入 v1 再次,新的输出批处理结果是


-------------------------------------------
Batch: 2
-------------------------------------------
+-----+
|value|
+-----+
+-----+

,Spark似乎是记住的首先是 v1 (批次)到不同的第二批次结果。

From the spark structured streaming docs, unsupported operations contain that

Distinct operations on streaming Datasets are not supported.

However, there is a distinct() method in the API and I can also call distinct() after streaming DateSet.

  public final class JavaStructuredNetworkWordDistinct {

  public static void main(String[] args) throws Exception {
    System.setProperty("hadoop.home.dir","C://hadoop" );
    SparkSession spark = SparkSession
            .builder()
            .appName("JavaStructuredNetworkWordDistinct")
            .config("spark.master", "local[*]")
            .getOrCreate();
    spark.sparkContext().setLogLevel("ERROR");
    spark.conf().set("spark.sql.shuffle.partitions",4);
    // Read all the csv files written atomically in a directory
    StructType userSchema = new StructType().add("event_time", "string").add("id", "string");
    Dataset<Tuple2<Timestamp, String>> dropStream = spark
            .readStream()
            .option("sep", ",")
            .schema(userSchema)      // Specify schema of the csv files
            .csv("D:\\deduplication")
            .selectExpr("to_timestamp(event_time,'yyyy-MM-dd HH:mm:ss') as event_time","id as id")
            .as(Encoders.tuple(Encoders.TIMESTAMP(), Encoders.STRING()));

    StreamingQuery outerQuery =  execDeduplicationDistinct(spark,dropStream);
    outerQuery.awaitTermination();
  }


  private static StreamingQuery execDeduplicationDistinct(SparkSession spark, Dataset<Tuple2<Timestamp, String>> dropStream) {

    Dataset<Tuple2<Timestamp, String>> dropDuplicatesStream = dropStream.distinct();
    // Start running the query that prints the running counts to the console
    StreamingQuery query = dropDuplicatesStream.writeStream()
            .outputMode("append")
            .format("console")
            .start();
    return query;
  }
}

And there only one file under the folder D:\\deduplication , content are

event_time,word
2022-04-10 11:44:00,word1
2022-04-10 11:45:00,word2
2022-04-10 11:45:00,word2
2022-04-10 11:45:00,word2

finally , the result shows that

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+-----+
|         event_time|   id|
+-------------------+-----+
|2022-04-10 11:44:00|word1|
|2022-04-10 11:45:00|word2|
|               null| word|
+-------------------+-----+

so ? what is wrong when I understand distinct?

And, I also run socket streaming.
Code is

object StructuredNetworkWordCountDistinct {
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    val spark: SparkSession = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .master("local[*]")
      .config("spark.sql.shuffle.partitions",4)
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    import spark.implicits._

    // Create DataFrame representing the stream of input lines from connection to host:port
    val lines: DataFrame = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()
    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))
    // Generate running word count
    val wordCounts = words.distinct()
    // Start running the query that prints the running counts to the console
    val query = wordCounts.writeStream
      .trigger(Trigger.ProcessingTime("1 second"))  // only change in query
      .outputMode("append")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

and start netcat with nc -L -p 9999.
Firstly, input v1 and the all output batchs results are

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
|   v1|
+-----+

and secondly, input v1 again, and new output batch result is


-------------------------------------------
Batch: 2
-------------------------------------------
+-----+
|value|
+-----+
+-----+

And spark seems to remembered first v1 (batch) to distinct second batch result.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

驱逐舰岛风号 2025-01-29 00:52:17

“ ...某些数据框操作无法用Spark支持
结构化流媒体,例如不同的分类等
转换火花将需要将整个数据存储在内存中。
...”

糟糕的解释。

使用完成模式的这种类型的查询与操作不同:

val mappedDF = jsonEdits.groupBy($"user").agg(countDistinct($"userURL").as("cnt"))  

返回:

org.apache.spark.sql.AnalysisException: Distinct aggregations are not supported on streaming DataFrames/Datasets. Consider using approx_count_distinct() instead.

如果使用 Append 模式,则需要>水印

您的查询是一个简单的附加;那里有一个不同的没有
聚合。这是一个微不足道的小问题在微批处理
级别,它没有国家问题或AGG的考虑,并且只是处理
如您所见,当前的微批量。 Doc有点贫穷
这方面。

"...Some Dataframe operations cannot be supported with Spark
Structured Streaming, e.g. distinct, sorting, etc. as for those
transformations Spark would need to store the entire data in memory.
..."

Badly explained.

This type of query with complete mode does not work with distinct like operation:

val mappedDF = jsonEdits.groupBy(
quot;user").agg(countDistinct(
quot;userURL").as("cnt"))  

It returns:

org.apache.spark.sql.AnalysisException: Distinct aggregations are not supported on streaming DataFrames/Datasets. Consider using approx_count_distinct() instead.

If using append mode, it needs a watermark also.

Your query is a simple append; there a distinct Without
aggregation. This is a trivial matter for Spark at the micro batch
level, it has no state issue or agg's to consider, and just processes
the current micro batch, as you observed. Doc's are a little poor in
this regard.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文