从Spark询问Cassandra,将聚合推向Cassandra

发布于 2025-01-17 20:26:54 字数 1958 浏览 2 评论 0原文

我有一个如下所示的 Cassandra 表:

my_keyspace.my_table (
    part_key_col_1 text,
    clust_key_col_1 int,
    clust_key_col_2 text,
    value_col_1 text
    PRIMARY KEY (part_key_col_1, clust_key_col_1, clust_key_col_2, value_col_1)

我希望检索每个 part_key_col_1clust_key_col_1 的最大值,我还希望在 上有一个过滤器clust_key_col_1。在 CQL 中,我可以使用以下方法实现此目的:

SELECT
    part_key_col_1
    max(clust_key_col_1)
FROM my_table
WHERE clust_key_col_1 < 123
GROUP BY part_key_col_1
ALLOW FILTERING

尽管我需要使用 ALLOW FILTERING,但查询速度非常快,我得到了大约 1 000 000 个唯一的 part_key_col_1,并且对于每个 part_key_col_1 我获得的唯一 clust_key_col_1 少于 5000 个。

当我尝试使用 Spark Cassandra 连接器在 Spark 中获取相同的数据时,我的问题就出现了。我在 Spark 中尝试了以下操作:

cassandra_df = (
    spark.read
    .format("org.apache.spark.sql.cassandra")
    .options(table='my_table', keyspace='my_keyspace')
    .load()
    .filter(f.col('clust_key_col_1') < 123)
    .groupBy(f.col('part_key_col_1'))
    .agg(
        f.max('clust_key_col_1')
    )
)

但物理计划最终是:

== Physical Plan ==
*(2) HashAggregate(keys=[part_key_col_1#144], functions=[max(clust_key_col_1#145)])
+- Exchange hashpartitioning(part_key_col_1#144, 20)
   +- *(1) HashAggregate(keys=[part_key_col_1#144], functions=[partial_max(clust_key_col_1#145)])
      +- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [part_key_col_1#144,clust_key_col_1#145] PushedFilters: [*LessThan(clust_key_col_1,123)], ReadSchema: struct<part_key_col_1:string,clust_key_col_1:int>

这意味着 clust_key_col_1 的过滤器被下推到 Cassandra,但分组和聚合却没有。相反,所有数据(clust_key_col_1 <123)都会加载到 Spark 中并在 Spark 中聚合。我可以以某种方式将分组/聚合“下推”到 Cassandra 并仅加载每个 part_key_col_1 的 max(clust_key_col_1) )以减少 Spark 和网络的负载吗?现在 Spark 将加载 1 000 000 * 5000 行而不是 1 000 000 行。

I've got a Cassandra table looking like this:

my_keyspace.my_table (
    part_key_col_1 text,
    clust_key_col_1 int,
    clust_key_col_2 text,
    value_col_1 text
    PRIMARY KEY (part_key_col_1, clust_key_col_1, clust_key_col_2, value_col_1)

I'm looking to retrieve tha maximum value of clust_key_col_1 for each part_key_col_1, where I also want a filter on clust_key_col_1. In CQL I can achieve this using:

SELECT
    part_key_col_1
    max(clust_key_col_1)
FROM my_table
WHERE clust_key_col_1 < 123
GROUP BY part_key_col_1
ALLOW FILTERING

Even though I need to use ALLOW FILTERING the query is super fast, I got roughly 1 000 000 unique part_key_col_1 and for each part_key_col_1 I got less than 5000 unique clust_key_col_1.

My problem comes when I try to get the same data in Spark using Spark Cassandra Connector. I've tried the following in Spark:

cassandra_df = (
    spark.read
    .format("org.apache.spark.sql.cassandra")
    .options(table='my_table', keyspace='my_keyspace')
    .load()
    .filter(f.col('clust_key_col_1') < 123)
    .groupBy(f.col('part_key_col_1'))
    .agg(
        f.max('clust_key_col_1')
    )
)

But the Physical-plan ends up being:

== Physical Plan ==
*(2) HashAggregate(keys=[part_key_col_1#144], functions=[max(clust_key_col_1#145)])
+- Exchange hashpartitioning(part_key_col_1#144, 20)
   +- *(1) HashAggregate(keys=[part_key_col_1#144], functions=[partial_max(clust_key_col_1#145)])
      +- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [part_key_col_1#144,clust_key_col_1#145] PushedFilters: [*LessThan(clust_key_col_1,123)], ReadSchema: struct<part_key_col_1:string,clust_key_col_1:int>

Meaning the filter for clust_key_col_1 gets pushed down to Cassandra, but the grouping and the aggregation does not. Instead all the data (with clust_key_col_1 < 123) gets loaded into Spark and aggregated in Spark. Can I somehow "push down" the grouping/aggregation to Cassandra and only load the max(clust_key_col_1) for each part_key_col_1) to reduce the load on Spark and the network? Right now Spark will load 1 000 000 * 5000 rows instead of 1 000 000 rows.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文