Local vector 数据类型
在单机模式下机器学习库支持本地向量和矩阵存储,以及分布式矩阵由一个或多个 RDD 组成
Local vector 单机模式下支持两种类型 dense 和 sparse
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Create a dense vector (1.0, 0.0, 3.0) 元素的值是 double 类型
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
println( "dv:"+ dv )
// Create a sparse vector (1.0, 0.0, 3.0) 其中第一个数组是索引,第二个数组是索引对应的值
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
println( "sv1:" + sv1 )
// Create a sparse vector (1.0, 0.0, 3.0) Seq 中的每个元素包含着(索引,值)
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
println( "sv2:" + sv2 )
dv:[1.0,0.0,3.0]
sv1:(3,[0,2],[1.0,3.0])
sv2:(3,[0,2],[1.0,3.0])
Labeled point 在有监督学习算法中我们对数据用 double 类型的数据进行标记,用于分类或则回归 如果是二分类我们可以使用 0 (negative) or 1 (positive),如果是多类型分类那么我们可以使用 0,1,2,3……
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
println(pos)
// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
println(neg)
(1.0,[1.0,0.0,3.0])
(0.0,(3,[0,2],[1.0,3.0]))
下面是 mllib 中 labeled point 样本数据
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "../data/mllib/sample_libsvm_data.txt")
examples.take(10)
Array((0.0,(692,[127,128,129,130,131,154,155,156,157,158,159,181,182,183,184,185,186,187,188,189,207,208,209,210,211,212,213,214,215,216,217,235,236,237,238,239,240,241,242,243,244,245,262,263,264,265,266,267,268,269,270,271,272,273,289,290,291,292,293,294,295,296,297,300,301,302,316,317,318,319,320,321,328,329,330,343,344,345,346,347,348,349,356,357,358,371,372,373,374,384,385,386,399,400,401,412,413,414,426,427,428,429,440,441,442,454,455,456,457,466,467,468,469,470,482,483,484,493,494,495,496,497,510,511,512,520,521,522,523,538,539,540,547,548,549,550,566,567,568,569,570,571,572,573,574,575,576,577,578,594,595,596,597,598,599,600,601,602,603,604,622,623,624,625,626,627,628,629,630,651,652,653,654,655,656,657],[51.0,159.0,...
Local matrix 本地矩阵中的每个元素都是以 double 类型进行存储,机器学习库支持两种类型的 local matrix, DenseMatrix, and SparseMatrix 。下面是我们用 array [1.0, 3.0, 5.0, 2.0, 4.0, 6.0] 转化为一个 3 行 2 列的 DenseMatrix $$\begin{pmatrix} 1.0 & 2.0 \ 3.0 & 4.0 \ 5.0 & 6.0 \end{pmatrix}$$
import org.apache.spark.mllib.linalg.{Matrix, Matrices}
// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) 3 行 2 列 matrix 按列优先进行存放
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
println("dm:")
println(dm)
// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) 第一个数组表示的是矩阵的列下标,第二个表示的是行下标
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
println("sm:"+sm)
dm:
1.0 2.0
3.0 4.0
5.0 6.0
sm:3 x 2 CSCMatrix
(0,0) 9.0
(2,1) 6.0
(1,1) 8.0
Distributed matrix 分布式矩阵的行和列都是用 long 类型保存的,元素值使用 double 类型进行存储,选择合适的类型分布式矩阵来存储数据是很重要的因为矩阵类型的转化是全局 shuffle 所以代价很高。 Ddistributed matrix 分为四种类型 RowMatrix IndexedRowMatrix CoordinateMatrix BlockMatrix
RowMatrix 中的每一行都是有一个 local vector 进行存储,列的数目是被限制住一个整数范围内
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
val ad_tags = Seq(
Vectors.dense( 3, 1,1),
Vectors.dense( 5, 2,1),
Vectors.dense( 6, 3,1),
Vectors.dense( 7, 5,1)
)
val rows: RDD[Vector] = sc.parallelize( ad_tags )
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
println( "m=" + m + " n=" + n )
mat.rows.foreach( println(_) )
m=4 n=3
IndexedRowMatrix 和 RowMatrix 的不同在于每行多了 long 类型的 index
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Vectors
val ad_tags = Seq(
IndexedRow(1, Vectors.dense( 3, 1,1) ),
IndexedRow(2, Vectors.dense( 5, 2,1) ),
IndexedRow(3, Vectors.dense( 6, 3,1) ),
IndexedRow(4, Vectors.dense( 7, 5,1) ),
IndexedRow(5, Vectors.dense( 8, 6,1) ),
IndexedRow(6, Vectors.dense( 6, 8,1) ),
IndexedRow(7, Vectors.dense( 8, 9,1) )
)
val rows: RDD[IndexedRow] = sc.parallelize( ad_tags )
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
println( "m=" + m + " n=" + n )
mat.rows.foreach(println(_))
// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()
m=8 n=3
CoordinateMatrix CoordinateMatrix 通常被用于较大的稀疏矩阵,每个元素的值是一个 MatrixEntry(row,column,value)
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD
val ad_tags = Seq(
MatrixEntry(2,3,4.0),
MatrixEntry(2,8,1.0),
MatrixEntry(3,2,4.0),
MatrixEntry(5,1,3.0),
MatrixEntry(5,4,7.0),
MatrixEntry(5,7,2.0)
)
val entries: RDD[MatrixEntry] = sc.parallelize( ad_tags ) // an RDD of matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)
mat.entries.foreach(println(_))
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
println( "m=" + m + " n=" + n )
// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()
m=6 n=9
BlockMatrix BlockMatrix is a distributed matrix backed by an RDD of MatrixBlocks, where a MatrixBlock is a tuple of ((Int, Int), Matrix), where the (Int, Int) is the index of the block, and Matrix is the sub-matrix at the given index with size rowsPerBlock x colsPerBlock. BlockMatrix supports methods such as add and multiply with another BlockMatrix. BlockMatrix also has a helper function validate which can be used to check whether the BlockMatrix is set up properly.
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD
val ad_tags = Seq(
MatrixEntry(2,3,4.0),
MatrixEntry(2,8,1.0),
MatrixEntry(3,2,4.0),
MatrixEntry(5,1,3.0),
MatrixEntry(5,4,7.0),
MatrixEntry(5,7,2.0)
)
val entries: RDD[MatrixEntry] = sc.parallelize( ad_tags ) // an RDD of matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
matA.blocks.foreach(println(_))
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate()
// Calculate A^T A.
val ata = matA.transpose.multiply(matA)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论