Local vector 数据类型

发布于 2024-07-23 19:08:01 字数 7673 浏览 9 评论 0

在单机模式下机器学习库支持本地向量和矩阵存储,以及分布式矩阵由一个或多个 RDD 组成

Local vector 单机模式下支持两种类型 dense 和 sparse

import org.apache.spark.mllib.linalg.{Vector, Vectors}

// Create a dense vector (1.0, 0.0, 3.0) 元素的值是 double 类型
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
println( "dv:"+ dv )
// Create a sparse vector (1.0, 0.0, 3.0) 其中第一个数组是索引,第二个数组是索引对应的值
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
println( "sv1:" + sv1 )
// Create a sparse vector (1.0, 0.0, 3.0) Seq 中的每个元素包含着(索引,值)
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
println( "sv2:" + sv2 )
dv:[1.0,0.0,3.0]
sv1:(3,[0,2],[1.0,3.0])
sv2:(3,[0,2],[1.0,3.0])

Labeled point 在有监督学习算法中我们对数据用 double 类型的数据进行标记,用于分类或则回归 如果是二分类我们可以使用 0 (negative) or 1 (positive),如果是多类型分类那么我们可以使用 0,1,2,3……

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
println(pos)
// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
println(neg)
(1.0,[1.0,0.0,3.0])
(0.0,(3,[0,2],[1.0,3.0]))

下面是 mllib 中 labeled point 样本数据

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD

val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "../data/mllib/sample_libsvm_data.txt")
examples.take(10)
Array((0.0,(692,[127,128,129,130,131,154,155,156,157,158,159,181,182,183,184,185,186,187,188,189,207,208,209,210,211,212,213,214,215,216,217,235,236,237,238,239,240,241,242,243,244,245,262,263,264,265,266,267,268,269,270,271,272,273,289,290,291,292,293,294,295,296,297,300,301,302,316,317,318,319,320,321,328,329,330,343,344,345,346,347,348,349,356,357,358,371,372,373,374,384,385,386,399,400,401,412,413,414,426,427,428,429,440,441,442,454,455,456,457,466,467,468,469,470,482,483,484,493,494,495,496,497,510,511,512,520,521,522,523,538,539,540,547,548,549,550,566,567,568,569,570,571,572,573,574,575,576,577,578,594,595,596,597,598,599,600,601,602,603,604,622,623,624,625,626,627,628,629,630,651,652,653,654,655,656,657],[51.0,159.0,...

Local matrix 本地矩阵中的每个元素都是以 double 类型进行存储,机器学习库支持两种类型的 local matrix, DenseMatrix, and SparseMatrix 。下面是我们用 array [1.0, 3.0, 5.0, 2.0, 4.0, 6.0] 转化为一个 3 行 2 列的 DenseMatrix $$\begin{pmatrix} 1.0 & 2.0 \ 3.0 & 4.0 \ 5.0 & 6.0 \end{pmatrix}$$

import org.apache.spark.mllib.linalg.{Matrix, Matrices}

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) 3 行 2 列 matrix 按列优先进行存放
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
println("dm:")
println(dm)
// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) 第一个数组表示的是矩阵的列下标,第二个表示的是行下标
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
println("sm:"+sm)
dm:
1.0  2.0  
3.0  4.0  
5.0  6.0  
sm:3 x 2 CSCMatrix
(0,0) 9.0
(2,1) 6.0
(1,1) 8.0

Distributed matrix 分布式矩阵的行和列都是用 long 类型保存的,元素值使用 double 类型进行存储,选择合适的类型分布式矩阵来存储数据是很重要的因为矩阵类型的转化是全局 shuffle 所以代价很高。 Ddistributed matrix 分为四种类型 RowMatrix IndexedRowMatrix CoordinateMatrix BlockMatrix

RowMatrix 中的每一行都是有一个 local vector 进行存储,列的数目是被限制住一个整数范围内

import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD

val ad_tags = Seq(
    Vectors.dense( 3, 1,1),
    Vectors.dense( 5, 2,1),
    Vectors.dense( 6, 3,1),
    Vectors.dense( 7, 5,1)
  )

  val rows: RDD[Vector] = sc.parallelize( ad_tags )
  // Create a RowMatrix from an RDD[Vector].
  val mat: RowMatrix = new RowMatrix(rows)
  // Get its size.
  val m = mat.numRows()
  val n = mat.numCols()
  println( "m=" + m + " n=" + n )
  mat.rows.foreach( println(_) )  
m=4 n=3

IndexedRowMatrix 和 RowMatrix 的不同在于每行多了 long 类型的 index

import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Vectors

val ad_tags = Seq(
    IndexedRow(1, Vectors.dense( 3, 1,1) ),
    IndexedRow(2, Vectors.dense( 5, 2,1) ),
    IndexedRow(3, Vectors.dense( 6, 3,1) ),
    IndexedRow(4, Vectors.dense( 7, 5,1) ),
    IndexedRow(5, Vectors.dense( 8, 6,1) ),
    IndexedRow(6, Vectors.dense( 6, 8,1) ),
    IndexedRow(7, Vectors.dense( 8, 9,1) )
  )

  val rows: RDD[IndexedRow] = sc.parallelize( ad_tags )

  // Create an IndexedRowMatrix from an RDD[IndexedRow].
  val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)

  // Get its size.
  val m = mat.numRows()
  val n = mat.numCols()
  println( "m=" + m + " n=" + n )
  mat.rows.foreach(println(_))
  // Drop its row indices.
  val rowMat: RowMatrix = mat.toRowMatrix()
m=8 n=3

CoordinateMatrix CoordinateMatrix 通常被用于较大的稀疏矩阵,每个元素的值是一个 MatrixEntry(row,column,value)

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD

val ad_tags = Seq(
    MatrixEntry(2,3,4.0),
    MatrixEntry(2,8,1.0),
    MatrixEntry(3,2,4.0),
    MatrixEntry(5,1,3.0),
    MatrixEntry(5,4,7.0),
    MatrixEntry(5,7,2.0)
  )

  val entries: RDD[MatrixEntry] = sc.parallelize( ad_tags ) // an RDD of matrix entries
  // Create a CoordinateMatrix from an RDD[MatrixEntry].
  val mat: CoordinateMatrix = new CoordinateMatrix(entries)
  mat.entries.foreach(println(_))
  // Get its size.
  val m = mat.numRows()
  val n = mat.numCols()
  println( "m=" + m + " n=" + n )
  // Convert it to an IndexRowMatrix whose rows are sparse vectors.
  val indexedRowMatrix = mat.toIndexedRowMatrix()
m=6 n=9

BlockMatrix BlockMatrix is a distributed matrix backed by an RDD of MatrixBlocks, where a MatrixBlock is a tuple of ((Int, Int), Matrix), where the (Int, Int) is the index of the block, and Matrix is the sub-matrix at the given index with size rowsPerBlock x colsPerBlock. BlockMatrix supports methods such as add and multiply with another BlockMatrix. BlockMatrix also has a helper function validate which can be used to check whether the BlockMatrix is set up properly.

import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD

val ad_tags = Seq(
    MatrixEntry(2,3,4.0),
    MatrixEntry(2,8,1.0),
    MatrixEntry(3,2,4.0),
    MatrixEntry(5,1,3.0),
    MatrixEntry(5,4,7.0),
    MatrixEntry(5,7,2.0)
  )

  val entries: RDD[MatrixEntry] = sc.parallelize( ad_tags ) // an RDD of matrix entries
  // Create a CoordinateMatrix from an RDD[MatrixEntry].
  val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
  // Transform the CoordinateMatrix to a BlockMatrix
  val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
  matA.blocks.foreach(println(_))
  // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
  // Nothing happens if it is valid.
  matA.validate()

  // Calculate A^T A.
  val ata = matA.transpose.multiply(matA)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

空宴

暂无简介

0 文章
0 评论
21 人气
更多

推荐作者

内心激荡

文章 0 评论 0

JSmiles

文章 0 评论 0

左秋

文章 0 评论 0

迪街小绵羊

文章 0 评论 0

瞳孔里扚悲伤

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文