您如何使用Spark列出Cassandra的大型Cassandra桌子中的所有分区键?

发布于 2025-01-19 14:48:33 字数 2288 浏览 5 评论 0原文

我们有一个名为Cassandra-Scan的程序,该程序使用Spark-Cassandra-Connector在非常大的表中列出了分区密钥的所有值。该桌子有大约1700万的木薯分区,每个分区平均有200行。该桌子的Cassandra群集在6个节点上运行DSE 5.1.8。包含表的密钥空间的复制因子为3。

这是密钥空间和表的简化定义。

CREATE KEYSPACE myspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}  AND durable_writes = true;

CREATE TABLE myspace.largetable (
    id text,
    itemOrder text,
    ...
    PRIMARY KEY (id, itemOrder)
) WITH CLUSTERING ORDER BY (itemOrder ASC)

Cassandra-Scan中用于列出分区密钥的所有值的说明如下:

val res = sc.cassandraTable(keyspace, table).select("id").perPartitionLimit(1).repartition(320)

我们使用Apache Spark 2.3.1和Spark-Cassandra-Connector 2.3.2。用于启动Cassandra-Scan的命令如下。

/path/to/spark/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class "CassandraScan" --jars /path/to/spark-cassandra-connector_2.11-2.3.2.jar --executor-memory 15g --master local[20] cassandra-scan.jar &

Cassandra-scan可以正确运行,大约需要19个小时。

我们最近建立了一个新的Cassandra群集,再次使用6个节点(与第一个群集中使用的节点不同)。该群集运行DSE 6.8.16。第一表中的所有数据都添加到新集群中的表中。

我们将Apache Spark的版本更新为2.4.8,并将Spark-Cassandra-Connector更新为2.4.2。我们用否测试了该程序。在2000年至200,000范围内的火花隔板。我们无法让Cassandra-Scan正确运行。我们看到以下形式的错误:

java.io.IOException: Exception during execution of SELECT "id" FROM "myspace"."largetable" WHERE token("id") > ? AND token("id") <= ?  PER PARTITION LIMIT 1 ALLOW FILTERING: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)

Cassandra-Scan的某些运行导致一些Cassandra节点在Cassandra Logs中的以下消息降低。

INFO  [CoreThread-22] 2022-04-03 06:26:35,467  InboundHandshakeHandler.java:353 - Failed to properly handshake with peer /xxx.xxx.xxx.xxx:41231. Closing the channel.
java.lang.OutOfMemoryError: Direct buffer memory
WARN  [Outbound-/xxx.xxx.xxx.xxx-large-message-writer] 2022-04-01 19:17:58,248  AbstractOutboundMessageHandler.java:80 - LARGE_MESSAGE with id 97 from /xxx.xxx.xxx.xxx to /xxx.xxx.xxx.xxx via (/xxx.xxx.xxx.xxx,/xxx.xxx.xxx.xxx:7000) error...
java.io.IOException: java.lang.RuntimeException: io.netty.channel.unix.Errors$NativeIoException: writeAddress(..) failed: Connection reset by peer

值得赞赏的是,任何帮助使此工作的工作。谢谢。

We have a program called cassandra-scan which uses spark-cassandra-connector to list all the values of the partition key in a very large table. The table has around 17 million Cassandra partitions, and each partition has an average of 200 rows. The Cassandra cluster housing this table runs DSE 5.1.8 on 6 nodes. The replication factor for the keyspace containing the table is 3.

Here are simplified definitions of the keyspace and table.

CREATE KEYSPACE myspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}  AND durable_writes = true;

CREATE TABLE myspace.largetable (
    id text,
    itemOrder text,
    ...
    PRIMARY KEY (id, itemOrder)
) WITH CLUSTERING ORDER BY (itemOrder ASC)

The statement used in cassandra-scan to list all the values of the partition key is as follows:

val res = sc.cassandraTable(keyspace, table).select("id").perPartitionLimit(1).repartition(320)

We use Apache Spark 2.3.1 and spark-cassandra-connector 2.3.2. The command used to launch cassandra-scan is as follows.

/path/to/spark/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class "CassandraScan" --jars /path/to/spark-cassandra-connector_2.11-2.3.2.jar --executor-memory 15g --master local[20] cassandra-scan.jar &

cassandra-scan runs correctly and takes around 19 hours.

We recently set up a new Cassandra cluster, again with 6 nodes (different from those used in the first cluster). This cluster runs DSE 6.8.16. All the data from the first table has been added to a table in the new cluster.

We updated the version of Apache Spark to 2.4.8, and spark-cassandra-connector to 2.4.2. We tested the program with no. of Spark partitions in the range 2000 to 200,000. We haven't been able to get cassandra-scan to run correctly. We see errors of the following form:

java.io.IOException: Exception during execution of SELECT "id" FROM "myspace"."largetable" WHERE token("id") > ? AND token("id") <= ?  PER PARTITION LIMIT 1 ALLOW FILTERING: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)

Some runs of cassandra-scan caused some of the Cassandra nodes to go down with messages such as the following in the Cassandra logs.

INFO  [CoreThread-22] 2022-04-03 06:26:35,467  InboundHandshakeHandler.java:353 - Failed to properly handshake with peer /xxx.xxx.xxx.xxx:41231. Closing the channel.
java.lang.OutOfMemoryError: Direct buffer memory
WARN  [Outbound-/xxx.xxx.xxx.xxx-large-message-writer] 2022-04-01 19:17:58,248  AbstractOutboundMessageHandler.java:80 - LARGE_MESSAGE with id 97 from /xxx.xxx.xxx.xxx to /xxx.xxx.xxx.xxx via (/xxx.xxx.xxx.xxx,/xxx.xxx.xxx.xxx:7000) error...
java.io.IOException: java.lang.RuntimeException: io.netty.channel.unix.Errors$NativeIoException: writeAddress(..) failed: Connection reset by peer

Any help with getting this to work is much appreciated. Thanks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

风透绣罗衣 2025-01-26 14:48:33

我们使用 DataStax Bulk Loader 来解决
问题。

dsbulk unload \
  --connector.csv.url <path>/<to>/<outputDir> \
  -h <host> \
  -query "select distinct id from myspace.largetable"

dsbulk 花了大约 3 个小时才获得 1750 万个值。

We used DataStax Bulk Loader to solve the
problem.

dsbulk unload \
  --connector.csv.url <path>/<to>/<outputDir> \
  -h <host> \
  -query "select distinct id from myspace.largetable"

dsbulk took around 3 hours to obtain 17.5 million values.

物价感观 2025-01-26 14:48:33

此错误表明群集中的至少一个节点无法提供请求:

    Not enough replicas available for query at consistency LOCAL_ONE \
      (1 required but only 0 alive)

您需要查看Cassandra日志以确定(1)哪些节点无反应/不可用,以及(2)原因。干杯!

This error indicates that at least one node in the cluster is unavailable to serve requests:

    Not enough replicas available for query at consistency LOCAL_ONE \
      (1 required but only 0 alive)

You need to review the Cassandra logs to determine (1) which of the nodes was unresponsive/unavailable, and (2) why. Cheers!

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文