您如何使用Spark列出Cassandra的大型Cassandra桌子中的所有分区键?
我们有一个名为Cassandra-Scan的程序,该程序使用Spark-Cassandra-Connector在非常大的表中列出了分区密钥的所有值。该桌子有大约1700万的木薯分区,每个分区平均有200行。该桌子的Cassandra群集在6个节点上运行DSE 5.1.8。包含表的密钥空间的复制因子为3。
这是密钥空间和表的简化定义。
CREATE KEYSPACE myspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'} AND durable_writes = true;
CREATE TABLE myspace.largetable (
id text,
itemOrder text,
...
PRIMARY KEY (id, itemOrder)
) WITH CLUSTERING ORDER BY (itemOrder ASC)
Cassandra-Scan中用于列出分区密钥的所有值的说明如下:
val res = sc.cassandraTable(keyspace, table).select("id").perPartitionLimit(1).repartition(320)
我们使用Apache Spark 2.3.1和Spark-Cassandra-Connector 2.3.2。用于启动Cassandra-Scan的命令如下。
/path/to/spark/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class "CassandraScan" --jars /path/to/spark-cassandra-connector_2.11-2.3.2.jar --executor-memory 15g --master local[20] cassandra-scan.jar &
Cassandra-scan可以正确运行,大约需要19个小时。
我们最近建立了一个新的Cassandra群集,再次使用6个节点(与第一个群集中使用的节点不同)。该群集运行DSE 6.8.16。第一表中的所有数据都添加到新集群中的表中。
我们将Apache Spark的版本更新为2.4.8,并将Spark-Cassandra-Connector更新为2.4.2。我们用否测试了该程序。在2000年至200,000范围内的火花隔板。我们无法让Cassandra-Scan正确运行。我们看到以下形式的错误:
java.io.IOException: Exception during execution of SELECT "id" FROM "myspace"."largetable" WHERE token("id") > ? AND token("id") <= ? PER PARTITION LIMIT 1 ALLOW FILTERING: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)
Cassandra-Scan的某些运行导致一些Cassandra节点在Cassandra Logs中的以下消息降低。
INFO [CoreThread-22] 2022-04-03 06:26:35,467 InboundHandshakeHandler.java:353 - Failed to properly handshake with peer /xxx.xxx.xxx.xxx:41231. Closing the channel.
java.lang.OutOfMemoryError: Direct buffer memory
WARN [Outbound-/xxx.xxx.xxx.xxx-large-message-writer] 2022-04-01 19:17:58,248 AbstractOutboundMessageHandler.java:80 - LARGE_MESSAGE with id 97 from /xxx.xxx.xxx.xxx to /xxx.xxx.xxx.xxx via (/xxx.xxx.xxx.xxx,/xxx.xxx.xxx.xxx:7000) error...
java.io.IOException: java.lang.RuntimeException: io.netty.channel.unix.Errors$NativeIoException: writeAddress(..) failed: Connection reset by peer
值得赞赏的是,任何帮助使此工作的工作。谢谢。
We have a program called cassandra-scan which uses spark-cassandra-connector to list all the values of the partition key in a very large table. The table has around 17 million Cassandra partitions, and each partition has an average of 200 rows. The Cassandra cluster housing this table runs DSE 5.1.8 on 6 nodes. The replication factor for the keyspace containing the table is 3.
Here are simplified definitions of the keyspace and table.
CREATE KEYSPACE myspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'} AND durable_writes = true;
CREATE TABLE myspace.largetable (
id text,
itemOrder text,
...
PRIMARY KEY (id, itemOrder)
) WITH CLUSTERING ORDER BY (itemOrder ASC)
The statement used in cassandra-scan to list all the values of the partition key is as follows:
val res = sc.cassandraTable(keyspace, table).select("id").perPartitionLimit(1).repartition(320)
We use Apache Spark 2.3.1 and spark-cassandra-connector 2.3.2. The command used to launch cassandra-scan is as follows.
/path/to/spark/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class "CassandraScan" --jars /path/to/spark-cassandra-connector_2.11-2.3.2.jar --executor-memory 15g --master local[20] cassandra-scan.jar &
cassandra-scan runs correctly and takes around 19 hours.
We recently set up a new Cassandra cluster, again with 6 nodes (different from those used in the first cluster). This cluster runs DSE 6.8.16. All the data from the first table has been added to a table in the new cluster.
We updated the version of Apache Spark to 2.4.8, and spark-cassandra-connector to 2.4.2. We tested the program with no. of Spark partitions in the range 2000 to 200,000. We haven't been able to get cassandra-scan to run correctly. We see errors of the following form:
java.io.IOException: Exception during execution of SELECT "id" FROM "myspace"."largetable" WHERE token("id") > ? AND token("id") <= ? PER PARTITION LIMIT 1 ALLOW FILTERING: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)
Some runs of cassandra-scan caused some of the Cassandra nodes to go down with messages such as the following in the Cassandra logs.
INFO [CoreThread-22] 2022-04-03 06:26:35,467 InboundHandshakeHandler.java:353 - Failed to properly handshake with peer /xxx.xxx.xxx.xxx:41231. Closing the channel.
java.lang.OutOfMemoryError: Direct buffer memory
WARN [Outbound-/xxx.xxx.xxx.xxx-large-message-writer] 2022-04-01 19:17:58,248 AbstractOutboundMessageHandler.java:80 - LARGE_MESSAGE with id 97 from /xxx.xxx.xxx.xxx to /xxx.xxx.xxx.xxx via (/xxx.xxx.xxx.xxx,/xxx.xxx.xxx.xxx:7000) error...
java.io.IOException: java.lang.RuntimeException: io.netty.channel.unix.Errors$NativeIoException: writeAddress(..) failed: Connection reset by peer
Any help with getting this to work is much appreciated. Thanks.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我们使用 DataStax Bulk Loader 来解决
问题。
dsbulk 花了大约 3 个小时才获得 1750 万个值。
We used DataStax Bulk Loader to solve the
problem.
dsbulk took around 3 hours to obtain 17.5 million values.
此错误表明群集中的至少一个节点无法提供请求:
您需要查看Cassandra日志以确定(1)哪些节点无反应/不可用,以及(2)原因。干杯!
This error indicates that at least one node in the cluster is unavailable to serve requests:
You need to review the Cassandra logs to determine (1) which of the nodes was unresponsive/unavailable, and (2) why. Cheers!