是独特的'不支持结构化流数据集的操作?
从火花结构化的流媒体文档中,包含它
不支持流媒体数据集的不同操作。
但是,API中有一个 distract()
方法,我也可以在流dateset之后调用 distract()
。
public final class JavaStructuredNetworkWordDistinct {
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir","C://hadoop" );
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordDistinct")
.config("spark.master", "local[*]")
.getOrCreate();
spark.sparkContext().setLogLevel("ERROR");
spark.conf().set("spark.sql.shuffle.partitions",4);
// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("event_time", "string").add("id", "string");
Dataset<Tuple2<Timestamp, String>> dropStream = spark
.readStream()
.option("sep", ",")
.schema(userSchema) // Specify schema of the csv files
.csv("D:\\deduplication")
.selectExpr("to_timestamp(event_time,'yyyy-MM-dd HH:mm:ss') as event_time","id as id")
.as(Encoders.tuple(Encoders.TIMESTAMP(), Encoders.STRING()));
StreamingQuery outerQuery = execDeduplicationDistinct(spark,dropStream);
outerQuery.awaitTermination();
}
private static StreamingQuery execDeduplicationDistinct(SparkSession spark, Dataset<Tuple2<Timestamp, String>> dropStream) {
Dataset<Tuple2<Timestamp, String>> dropDuplicatesStream = dropStream.distinct();
// Start running the query that prints the running counts to the console
StreamingQuery query = dropDuplicatesStream.writeStream()
.outputMode("append")
.format("console")
.start();
return query;
}
}
文件夹 d:\\ reduplication
下方只有一个文件,内容
event_time,word
2022-04-10 11:44:00,word1
2022-04-10 11:45:00,word2
2022-04-10 11:45:00,word2
2022-04-10 11:45:00,word2
最终
-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+-----+
| event_time| id|
+-------------------+-----+
|2022-04-10 11:44:00|word1|
|2022-04-10 11:45:00|word2|
| null| word|
+-------------------+-----+
是这样表明的吗?当我理解不同的
时,怎么了?
而且,我还运行插座流。 代码是
object StructuredNetworkWordCountDistinct {
def main(args: Array[String]): Unit = {
val host = args(0)
val port = args(1).toInt
val spark: SparkSession = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.master("local[*]")
.config("spark.sql.shuffle.partitions",4)
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines: DataFrame = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.distinct()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.trigger(Trigger.ProcessingTime("1 second")) // only change in query
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}
并使用 nc -l -L -P 9999
启动NetCat。
首先,输入 v1
,所有输出批处理结果是
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
| v1|
+-----+
,其次,输入 v1
再次,新的输出批处理结果是
-------------------------------------------
Batch: 2
-------------------------------------------
+-----+
|value|
+-----+
+-----+
,Spark似乎是记住的首先是 v1 (批次)到不同的第二批次结果。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
使用
完成
模式的这种类型的查询与操作不同:返回:
如果使用
Append
模式,则需要>水印
。This type of query with
complete
mode does not work with distinct like operation:It returns:
If using
append
mode, it needs awatermark
also.