从Spark询问Cassandra,将聚合推向Cassandra
我有一个如下所示的 Cassandra 表:
my_keyspace.my_table (
part_key_col_1 text,
clust_key_col_1 int,
clust_key_col_2 text,
value_col_1 text
PRIMARY KEY (part_key_col_1, clust_key_col_1, clust_key_col_2, value_col_1)
我希望检索每个 part_key_col_1
的 clust_key_col_1
的最大值,我还希望在 上有一个过滤器clust_key_col_1
。在 CQL 中,我可以使用以下方法实现此目的:
SELECT
part_key_col_1
max(clust_key_col_1)
FROM my_table
WHERE clust_key_col_1 < 123
GROUP BY part_key_col_1
ALLOW FILTERING
尽管我需要使用 ALLOW FILTERING
,但查询速度非常快,我得到了大约 1 000 000 个唯一的 part_key_col_1
,并且对于每个 part_key_col_1
我获得的唯一 clust_key_col_1
少于 5000 个。
当我尝试使用 Spark Cassandra 连接器在 Spark 中获取相同的数据时,我的问题就出现了。我在 Spark 中尝试了以下操作:
cassandra_df = (
spark.read
.format("org.apache.spark.sql.cassandra")
.options(table='my_table', keyspace='my_keyspace')
.load()
.filter(f.col('clust_key_col_1') < 123)
.groupBy(f.col('part_key_col_1'))
.agg(
f.max('clust_key_col_1')
)
)
但物理计划最终是:
== Physical Plan ==
*(2) HashAggregate(keys=[part_key_col_1#144], functions=[max(clust_key_col_1#145)])
+- Exchange hashpartitioning(part_key_col_1#144, 20)
+- *(1) HashAggregate(keys=[part_key_col_1#144], functions=[partial_max(clust_key_col_1#145)])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [part_key_col_1#144,clust_key_col_1#145] PushedFilters: [*LessThan(clust_key_col_1,123)], ReadSchema: struct<part_key_col_1:string,clust_key_col_1:int>
这意味着 clust_key_col_1
的过滤器被下推到 Cassandra,但分组和聚合却没有。相反,所有数据(clust_key_col_1 <123
)都会加载到 Spark 中并在 Spark 中聚合。我可以以某种方式将分组/聚合“下推”到 Cassandra 并仅加载每个 part_key_col_1
的 max(clust_key_col_1) )以减少 Spark 和网络的负载吗?现在 Spark 将加载 1 000 000 * 5000 行而不是 1 000 000 行。
I've got a Cassandra table looking like this:
my_keyspace.my_table (
part_key_col_1 text,
clust_key_col_1 int,
clust_key_col_2 text,
value_col_1 text
PRIMARY KEY (part_key_col_1, clust_key_col_1, clust_key_col_2, value_col_1)
I'm looking to retrieve tha maximum value of clust_key_col_1
for each part_key_col_1
, where I also want a filter on clust_key_col_1
. In CQL I can achieve this using:
SELECT
part_key_col_1
max(clust_key_col_1)
FROM my_table
WHERE clust_key_col_1 < 123
GROUP BY part_key_col_1
ALLOW FILTERING
Even though I need to use ALLOW FILTERING
the query is super fast, I got roughly 1 000 000 unique part_key_col_1
and for each part_key_col_1
I got less than 5000 unique clust_key_col_1
.
My problem comes when I try to get the same data in Spark using Spark Cassandra Connector. I've tried the following in Spark:
cassandra_df = (
spark.read
.format("org.apache.spark.sql.cassandra")
.options(table='my_table', keyspace='my_keyspace')
.load()
.filter(f.col('clust_key_col_1') < 123)
.groupBy(f.col('part_key_col_1'))
.agg(
f.max('clust_key_col_1')
)
)
But the Physical-plan ends up being:
== Physical Plan ==
*(2) HashAggregate(keys=[part_key_col_1#144], functions=[max(clust_key_col_1#145)])
+- Exchange hashpartitioning(part_key_col_1#144, 20)
+- *(1) HashAggregate(keys=[part_key_col_1#144], functions=[partial_max(clust_key_col_1#145)])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [part_key_col_1#144,clust_key_col_1#145] PushedFilters: [*LessThan(clust_key_col_1,123)], ReadSchema: struct<part_key_col_1:string,clust_key_col_1:int>
Meaning the filter for clust_key_col_1
gets pushed down to Cassandra, but the grouping and the aggregation does not. Instead all the data (with clust_key_col_1 < 123
) gets loaded into Spark and aggregated in Spark. Can I somehow "push down" the grouping/aggregation to Cassandra and only load the max(clust_key_col_1)
for each part_key_col_1
) to reduce the load on Spark and the network? Right now Spark will load 1 000 000 * 5000 rows instead of 1 000 000 rows.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论