Spark -Java-过滤器流询问查询

发布于 2025-02-03 09:48:29 字数 566 浏览 3 评论 0原文

我有一个在数据范围中接收数据的Spark应用程序:

Dataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topic").load().selectExpr("CAST(key AS STRING) as key");
String my_key = df.select("key").first().toString();
if (my_key == "a")
{
do_stuff
}

基本上我需要在值A的情况下,然后在数据框架上应用一些转换,否则我应用了其他转换。

但是,我正在处理流媒体查询,当我尝试应用上面的代码时,我得到的:

Queries with streaming sources must be executed with writeStream.start()

错误时发生了错误。

有人有什么想法吗?

提前致谢 :)

I've a Spark application that receives data in a dataframe:

Dataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topic").load().selectExpr("CAST(key AS STRING) as key");
String my_key = df.select("key").first().toString();
if (my_key == "a")
{
do_stuff
}

Basically I will need to in case of value a then I apply some transformations on the dataframe otherwise I apply other transformations.

However, I am dealing with streaming queries and when I tried to apply my code above I got:

Queries with streaming sources must be executed with writeStream.start()

The error happens when I make the first operation.

Anyone have any ideas?

Thanks in advance :)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

眼藏柔 2025-02-10 09:48:29

我能够使用以下方式解决我的问题:

Dataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topic").load().selectExpr("CAST(key AS STRING) as key").filter(functions.col("key").contains("a"));

I was able to sole my problem using:

Dataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topic").load().selectExpr("CAST(key AS STRING) as key").filter(functions.col("key").contains("a"));
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文