如何将 directJoin 与 Spark (scala) 一起使用?

发布于 2025-01-18 02:18:44 字数 3926 浏览 6 评论 0原文

我正在尝试将 directJoin 与分区键一起使用。但是当我运行引擎时,它不使用 directJoin。我想了解我是否做错了什么。这是我使用的代码:

配置设置:

val sparkConf: SparkConf = new SparkConf()
    .set(
      s"spark.sql.extensions",
      "com.datastax.spark.connector.CassandraSparkExtensions"
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial",
      "com.datastax.spark.connector.datasource.CassandraCatalog"
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial.spark.cassandra.connection.host",
      Settings.cassandraServerAddress
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial.spark.cassandra.auth.username",
      Settings.cassandraUser
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial.spark.cassandra.auth.password",
      Settings.cassandraPass
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial.spark.cassandra.connection.port",
      Settings.cassandraPort
    )

我使用目录是因为我打算在不同的集群上使用数据库。

SparkSession:

  val sparkSession: SparkSession = SparkSession
    .builder()
    .config(sparkConf)
    .appName(Settings.appName)
    .getOrCreate()

我尝试了以下两种方法:

这个:

val parameterVOne= spark.read
    .table("CassandraCommercial.ky.parameters")
    .select(
      "id",
      "year",
      "code"
    )

还有这个:

val parameterVTwo= spark.read
    .cassandraFormat("parameters", "CassandraCommercial.ky")
    .load
    .select(
      "id",
      "year",
      "code"
    )

第一个,虽然spark没有使用directjoin,但如果我使用show()它会正常显示数据:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#19, year#22, code#0]
   +- SortMergeJoin [id#19, year#22, code#0], [id#0, year#3, code#2, value#6], Inner, ((id#19 = id#0) AND (year#22 = year#3) AND (code#0 = code#2))

第二个返回这个:

Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {localhost:9042} :: Could not reach any contact point, make sure you've provided valid addresses (showing first 2 nodes, use getAllErrors() for more): Node(endPoint=localhost/127.0.0.1:9042, hostId=null, hashCode=307be82d): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException)], Node(endPoint=localhost/0:0:0:0:0:0:0:1:9042, hostId=null, hashCode=3ebc1052): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException)]

显然第二种方式没有采取设置在目录中定义,并且与第一种方式不同,直接访问本地主机。

具有键的数据帧只有 7 行,而 cassandra 数据帧大约有 200 万行。

这是我的bild.sbt:

ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.12.15"

lazy val root = (project in file("."))
  .settings(
    name                                        := "test-job",
    idePackagePrefix                            := Some("com.teste"),
    libraryDependencies += "org.apache.spark"   %% "spark-sql"                               % "3.2.1",
    libraryDependencies += "org.apache.spark"   %% "spark-core"                              % "3.2.1",
    libraryDependencies += "org.postgresql"      % "postgresql"                              % "42.3.3",
    libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector"               % "3.1.0",
    libraryDependencies += "joda-time"           % "joda-time"                               % "2.10.14",
    libraryDependencies += "com.crealytics"     %% "spark-excel"                             % "3.2.1_0.16.5-pre2",
    libraryDependencies += "com.datastax.spark"  % "spark-cassandra-connector-assembly_2.12" % "3.1.0"
  )

I'm trying to use directJoin with the partition keys. But when I run the engine, it doesn't use directJoin. I would like to understand if I am doing something wrong. Here is the code I used:

Configuring the settings:

val sparkConf: SparkConf = new SparkConf()
    .set(
      s"spark.sql.extensions",
      "com.datastax.spark.connector.CassandraSparkExtensions"
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial",
      "com.datastax.spark.connector.datasource.CassandraCatalog"
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial.spark.cassandra.connection.host",
      Settings.cassandraServerAddress
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial.spark.cassandra.auth.username",
      Settings.cassandraUser
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial.spark.cassandra.auth.password",
      Settings.cassandraPass
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial.spark.cassandra.connection.port",
      Settings.cassandraPort
    )

I am using catalog because I intend to use databases on different clusters.

SparkSession:

  val sparkSession: SparkSession = SparkSession
    .builder()
    .config(sparkConf)
    .appName(Settings.appName)
    .getOrCreate()

I tried it both ways below:

This:

val parameterVOne= spark.read
    .table("CassandraCommercial.ky.parameters")
    .select(
      "id",
      "year",
      "code"
    )

And this:

val parameterVTwo= spark.read
    .cassandraFormat("parameters", "CassandraCommercial.ky")
    .load
    .select(
      "id",
      "year",
      "code"
    )

The first one, although spark did not use directjoin, it brings up data normally if I use show():

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#19, year#22, code#0]
   +- SortMergeJoin [id#19, year#22, code#0], [id#0, year#3, code#2, value#6], Inner, ((id#19 = id#0) AND (year#22 = year#3) AND (code#0 = code#2))

And second return this:

Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {localhost:9042} :: Could not reach any contact point, make sure you've provided valid addresses (showing first 2 nodes, use getAllErrors() for more): Node(endPoint=localhost/127.0.0.1:9042, hostId=null, hashCode=307be82d): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException)], Node(endPoint=localhost/0:0:0:0:0:0:0:1:9042, hostId=null, hashCode=3ebc1052): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException)]

Apparently this second way did not take the settings defined in the catalog, and is accessing localhost directly unlike the first way.

The dataframe that has the keys has only 7 rows, while the cassandra dataframe has approximately 2 million.

This is my bild.sbt:

ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.12.15"

lazy val root = (project in file("."))
  .settings(
    name                                        := "test-job",
    idePackagePrefix                            := Some("com.teste"),
    libraryDependencies += "org.apache.spark"   %% "spark-sql"                               % "3.2.1",
    libraryDependencies += "org.apache.spark"   %% "spark-core"                              % "3.2.1",
    libraryDependencies += "org.postgresql"      % "postgresql"                              % "42.3.3",
    libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector"               % "3.1.0",
    libraryDependencies += "joda-time"           % "joda-time"                               % "2.10.14",
    libraryDependencies += "com.crealytics"     %% "spark-excel"                             % "3.2.1_0.16.5-pre2",
    libraryDependencies += "com.datastax.spark"  % "spark-cassandra-connector-assembly_2.12" % "3.1.0"
  )

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

笨笨の傻瓜 2025-01-25 02:18:44

我在 Spark 的某些版本中看到过这种行为 - 不幸的是,Spark 内部的更改经常会破坏此功能,因为它依赖于内部细节。因此,请提供有关 Spark 和 Spark 版本的更多信息。使用 Spark 连接器。

关于第二个错误,我怀疑直接连接可能没有使用Spark SQL属性,您可以尝试使用spark.cassandra.connection.hostspark.cassandra.auth.password 和其他 配置参数

PS 我有一篇关于使用 DirectJoin 的长博客文章,但它是在 Spark 2.4.x 上测试的(也许在 3.0 上,不记得了

I've seen this behavior in some versions of Spark - unfortunately, the changes in the internals of Spark often break this functionality because it relies on the internal details. So please provide more information on what version of Spark & Spark connector is used.

Regarding the second error, I suspect that direct join may not use Spark SQL properties, can you try to use spark.cassandra.connection.host, spark.cassandra.auth.password, and other configuration parameters?

P.S. I have a long blog post on using DirectJoin, but it was tested on Spark 2.4.x (and maybe on 3.0, don't remember

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文