用Spark在Cassandra中写RDD [实体]

发布于 2025-02-11 10:15:24 字数 2284 浏览 1 评论 0原文

我正在尝试编写一个RDD,其中包含Cassandra中的公共类带有Spark,

class Test(private var id: String, private var randomNumber: Integer, private var lastUpdate: Instant) {
        def setId(id: String): Unit = { this.id = id }
        def getId: String = { this.id }
        def setLastUpdater(lastUpdater: Instant): Unit = { this.lastUpdater = lastUpdater }
        def getLastUpdater: Instant = { this.lastUpdater }
        def setRandomNumber(number: Integer): Unit = { this.randomNumber = randomNumber }
        def getRandomNumber: Integer = { this.randomNumber }
}

此类都有所有的设定器和获取器来维护封装,因此我不得不成为案例类,因为我必须在转换过程中修改值。

与卡桑德拉(Cassandra)中该实体相对应的表对应的表具有略有不同的字段名称:

CREATE TABLE IF NOT EXISTS test.test (
  id uuid,
  random_number int,
  last_update timestamp,
  PRIMARY KEY (id)
) 

我试图用方法savetocassandra编写此rdd,

implicit val connector = CassandraConnector(sc.getConf)
val rdd: RDD[Test]
rdd.saveToCassandra("test", "test")

但是该方法给我提供了属性名称巧合的例外在表格中带有字段名称的课程中,

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Columns not found in entity.Test: [id, random_number, last_update]
at scala.Predef$.require(Predef.scala:277)
at com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForWriting(DefaultColumnMapper.scala:106)
at com.datastax.spark.connector.mapper.MappedToGettableDataConverter$$anon$1.<init>(MappedToGettableDataConverter.scala:35)
at com.datastax.spark.connector.mapper.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:26)
at com.datastax.spark.connector.writer.DefaultRowWriter.<init>(DefaultRowWriter.scala:16)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:30)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:28)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:433)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:417)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:35)

我该如何在Cassandra 中编写实体,而不必将属性调用相同 属性是私有的

I am trying to write an RDD that contains public classes in Cassandra with Spark

class Test(private var id: String, private var randomNumber: Integer, private var lastUpdate: Instant) {
        def setId(id: String): Unit = { this.id = id }
        def getId: String = { this.id }
        def setLastUpdater(lastUpdater: Instant): Unit = { this.lastUpdater = lastUpdater }
        def getLastUpdater: Instant = { this.lastUpdater }
        def setRandomNumber(number: Integer): Unit = { this.randomNumber = randomNumber }
        def getRandomNumber: Integer = { this.randomNumber }
}

This class has all the Setters and Getters to maintain the encapsulation and I need it to not be a Case Class because I have to modify the values during the transformations.

The table corresponding to this entity in Cassandra has slightly different names for the fields:

CREATE TABLE IF NOT EXISTS test.test (
  id uuid,
  random_number int,
  last_update timestamp,
  PRIMARY KEY (id)
) 

I am trying to write this RDD with the method saveToCassandra

implicit val connector = CassandraConnector(sc.getConf)
val rdd: RDD[Test]
rdd.saveToCassandra("test", "test")

but the method throws me an exception for the coincidence of the names of the attributes of the class with the names of the fields in the table

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Columns not found in entity.Test: [id, random_number, last_update]
at scala.Predef$.require(Predef.scala:277)
at com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForWriting(DefaultColumnMapper.scala:106)
at com.datastax.spark.connector.mapper.MappedToGettableDataConverter$anon$1.<init>(MappedToGettableDataConverter.scala:35)
at com.datastax.spark.connector.mapper.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:26)
at com.datastax.spark.connector.writer.DefaultRowWriter.<init>(DefaultRowWriter.scala:16)
at com.datastax.spark.connector.writer.DefaultRowWriter$anon$1.rowWriter(DefaultRowWriter.scala:30)
at com.datastax.spark.connector.writer.DefaultRowWriter$anon$1.rowWriter(DefaultRowWriter.scala:28)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:433)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:417)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:35)

how can I write the entity in Cassandra without having to call the attributes the same and the attributes are private in the class?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

卖梦商人 2025-02-18 10:15:24

savetocassandra允许您提供一个可选的colugnselector

  def saveToCassandra(
    keyspaceName: String,
    tableName: String,
    columns: ColumnSelector = AllColumns,
    writeConf: WriteConf = WriteConf.fromSparkConf(sparkContext.getConf))(...): Unit

在您的情况下,您可以使用以下选择器:

def selector = SomeColumns(
  ColumnName("id"),
  ColumnName("random_number", alias = Some("randomNumber")),
  ColumnName("last_update", alias = Some("lastUpdate"))
) 

btw,而不是典型(建议)使用案例类,可以绝对将字段定义为var s,并从使用键入数据集中受益。这使得在写信给卡桑德拉之前将字段重命名非常容易。

saveToCassandra allows you to provide an optional ColumnSelector:

  def saveToCassandra(
    keyspaceName: String,
    tableName: String,
    columns: ColumnSelector = AllColumns,
    writeConf: WriteConf = WriteConf.fromSparkConf(sparkContext.getConf))(...): Unit

In your case you could use the following selector:

def selector = SomeColumns(
  ColumnName("id"),
  ColumnName("random_number", alias = Some("randomNumber")),
  ColumnName("last_update", alias = Some("lastUpdate"))
) 

Btw, while not the typical (and recommended) use of a case class, you could absolutely define fields as vars and benefit from using a typed Dataset. That makes it very easy to rename fields before writing to Cassandra.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文