我的15TB Cassandra表中的一半以上的记录已经过时了,我想删除它们。我为此写了一份火花工作,但它看起来很脆弱,通常会因超时错误而死(不提及墓碑)。因此,我想在一系列较小的工作中扫描桌子,每个工作都处理桌子的独特而有限的部分(因此希望避免可怕的墓碑问题)。不幸的是,有了我的索引,我无法准确查询过时的记录,因此我必须全部检查它们。我的预期方法是选择令牌(partition_key)> m和token(partition_key)< n并选择一系列的M,n来在桌子上工作。 las,Spark似乎也有一个类似的想法,因此我得到了一个错误:
准备select ...从... where where where(“上下文”,“ itemid”)&gt的异常。 ?和令牌(“ context”,“ itemid”)< =?和令牌(context,itemid)> 9200000005000000000和令牌(上下文,itemid)< 9223372036854775807允许过滤:在上下文上发现了一个以上的限制,ItemID
我很确定前两个条件是由Spark注入。我不知道允许过滤来自哪里。
显然,该索引并未考虑到此清除操作。在某个时候,我可能会咬着子弹,然后迁移到复制的桌子上没有过时的记录。但是,如果可行的话,我想在原地进行此清除。
Over half of the records in my 15TB Cassandra table are now obsolete and I want to delete them. I've written a Spark job for it but it seems fragile and usually dies with timeout errors (no mention of tombstones). So I'd like to scan the table in a series of smaller jobs, each processing a distinct and limited part of the table (thus hopefully avoiding dreaded tombstone problems). Unfortunately with my index I can't query for exactly the obsolete records, so I have to check them all. My intended approach is to select WHERE TOKEN(partition_key) > M AND TOKEN(partition_key) < N and choose a series of M,N to work my way through the table. Alas, Spark seems to have a similar idea, and so I get this error:
Exception during preparation of SELECT ... FROM ... WHERE token("context", "itemid") > ? AND token("context", "itemid") <= ? AND token(context, itemid) > 9200000005000000000 AND token(context, itemid) < 9223372036854775807 ALLOW FILTERING: More than one restriction was found for the start bound on context, itemid
I'm pretty sure that the first two conditions are being inject by Spark. I don't know where the ALLOW FILTERING is coming from.
This index obviously wasn't designed with this purge operation in mind. And at some point I might bite the bullet and migrate to a copied table sans the obsolete records. But I'd like to do this purge in-place, if feasible.
发布评论
评论(1)
看看这是否对您有帮助。
参考:
spark.sparkcontext.cassandratable(ks,t)//完整的表扫描
.select(“ run_date”,“ entity_type”,“ rank_offset”)//仅修剪我们的pk列
.filter()//在此处进行火花侧滤波(no C* purpsown)
.deletefromcassandra(ks,ks,t)
See if this helps you.
Reference: https://community.datastax.com/questions/881/best-way-to-delete-1000s-of-partitions-with-spark.html
spark.sparkContext.cassandraTable(KS, T) //Full table scan
.select("run_date", "entity_type", "rank_offset") // Prune only our PK columns
.filter( ) // Do a Spark Side filtering here (No C* Pushdown)
.deleteFromCassandra(KS, T)