Spark源码分析之RDD下的KMeans
在搭建好了Spark开发环境之后,我们就可以编程了。这里介绍Spark自带的MLLib中的KMeans源码,尽管最新的Spark已经逐渐开始使用DataFrame作为数据模型,但是旧的RDD依然适用。下面我们将逐渐讲解KMeans的源码。KMeans主要步骤如下:
1、读取数据
2、对数据进行正规化
3、选择几个初始类中心
首先,我们给出输入的文件kmeans_data.txt
,文件包含6行数据,每一行是三个以空格分割的数字:
0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2
然后,我们给出Scala版本调用KMeans的方法,这里我们简单改写了一下官网的解析数据的方式,官网提供的是map,我们改成了mapPartitions(原因参考:https://martin.atlassian.net/wiki/spaces/lestermartin/blog/2016/05/19/67043332/why+spark+s+mapPartitions+transformation+is+faster+than+map+calls+your+function+once+partition+not+once+element )具体如下:
package rdd.ml.clustering
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by d00454735 on 2018/7/25.
*/
object KMeansTest {
def main(args: Array[String]): Unit = {
// 以本地模式运行,并初始化SparkContext
val masterURL = "local[*]"
val conf = new SparkConf().setAppName("KMeans Test").setMaster(masterURL)
val sc = new SparkContext(conf)
// 载入数据,并将每一行的数据解析,注意,官网给出的解析使用的是map方法,这里改写成了mapPartitions,因为后者是以数据块为单位的方式处理,其效率要远高于map方式(以数据行位单位处理)
val data = sc.textFile("file:/d:/data/kmeans_data.txt")
val parsedData = data.mapPartitions(partition => parseData(partition)).cache()
// 设定KMeans聚类的参数并对模型进行训练,这里定义聚成2类,迭代20次
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// 计算Within Set Sum of Squared Errors以评估聚类效果
val WSSSE = clusters.computeCost(parsedData)
println(s"Within Set Sum of Squared Errors = $WSSSE")
// 保存模型以及后续加载模型的写法
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
}
//解析数据,每一行先以空额分割成数组,然后将数组内的元素转化成Double形式,最后变成向量
def parseData(lines : Iterator[String]): Iterator[Vector] = {
lines.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
}
}
下面我们进入KMeans的源代码继续分析,首先来到运行的方法,这个方法先用private[spark]声明成属于spark下的私有方法,参数有两个,一个是RDD类型的数据,另一个是可选参数Option[Instrumentation[NewKMeans]],Option在scala里面表示这个参数可能存在可能不存在。Option[A] 是一个类型为 A 的可选值的容器: 如果值存在, Option[A] 就是一个 Some[A] ,如果不存在, Option[A] 就是对象 None (参考:https://windor.gitbooks.io/beginners-guide-to-scala/content/chp5-the-option-type.html )。Instrumentation是用于监控程序运行的类,本质上是java里面的代码。这里面涉及到一个缓存释放的问题,即upersist()
,参考:http://bourneli.github.io/scala/spark/2016/06/17/spark-unpersist-after-action.html
在初始化类中心的时候判断是否给定了类中心(参考:https://www.datalearner.com/blog/1051532740001101 ),如果给定了类中心,就用原来的,否则就要随机初始化k个类中心,这里初始化也给了两个方法,一个是所有的随机初始化initRandom(data: RDD[VectorWithNorm])
,另一个是并行初始化initKMeansParallel(data: RDD[VectorWithNorm])
,它们都是返回一个VectorWithNorm
的数组,
如下:
private[spark] def run(
data: RDD[Vector],
instr: Option[Instrumentation[NewKMeans]]): KMeansModel = {
// 如果数据没有做缓存的话会影响性能,这里做了一个警告检查
if (data.getStorageLevel == StorageLevel.NONE) {
logWarning("The input data is not directly cached, which may hurt performance if its"
+ " parent RDDs are also uncached.")
}
// 计算平方和并缓存,计算方法就是把每一行的元素平方后加和,然后再开方
val norms = data.map(Vectors.norm(_, 2.0))
norms.persist()
//zip是将元素连接起来,这里的含义就是将数据中的每一行和刚才的norm连接
val zippedData = data.zip(norms).map { case (v, norm) =>
new VectorWithNorm(v, norm)
}
//这里开始真正执行程序了
val model = runAlgorithm(zippedData, instr)
//释放缓存数据
norms.unpersist()
// Warn at the end of the run as well, for increased visibility.
if (data.getStorageLevel == StorageLevel.NONE) {
logWarning("The input data was not directly cached, which may hurt performance if its"
+ " parent RDDs are also uncached.")
}
//返回模型
model
}
上面就是KMeans算法的执行流程,接下来,我们看核心算法的方法,即runAgorithm()
,其参数也是两个,第一个是刚才带来norm的数据,第二个还是个监控参数。
KMeans更新的逻辑是首先对于每个数据点更新其所在的类,即对于每个数据点需要计算这个数据点与类中心的距离,然后选择最近的类中心点作为它所属的类,这样更新了所有的数据点所属的类之后,再计算每个类下数据点的中心点,那么该中心点就是新的类中心了。
第i个类别下的中心点的计算公式为:
m_{i} = \frac{1}{n_i} \sum_{j=1}^{j\in i} X_i
这里的n_i表示第i个类下有n_i个数据,X_i表示所有属于类i的数据点对应的向量。因此,KMeans的核心更新代码如下:
//采用mapPartitions的方式循环所有数据点
val newCenters = data.mapPartitions { points =>
//thisCenters表示当前的类中心,dims表示数据的维度
val thisCenters = bcCenters.value
val dims = thisCenters.head.vector.size
//这个变量存储类下所有数据点的向量之和,例如更新数据点所属类别后,类1下有两个数据点(1.0, 2.0, 3.0)和(1.0, 1.0, 1.0),那么sum(1) = (2.0, 3.0, 4.0),计算某个类下的中心点用到,所以它的维度是类别数,每个类别下的维度与数据维度一致
val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
//这个变量是存放每个类下有多少个数据点的
val counts = Array.fill(thisCenters.length)(0L)
//points是当前分区下所有的数据点,我们要计算这些数据点与哪个中心点最近,然后更新一下它所属的类,使用的是KMeans下的findClosest方法,分析见下面。
points.foreach { point =>
//寻找当前数据点所属的中心点,cost是距离,bestCenter是表示属于第几个中心点的索引。
val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
costAccum.add(cost) //成本函数累加
val sum = sums(bestCenter) //取出对应中心点的向量和
axpy(1.0, point.vector, sum) //把当前数据加上去,sum += 1 * point.vector
counts(bestCenter) += 1 //该类下的数据点数量加1
}
//由于mapPartitions要求返回的对象是Iterator[],因此,这里返回的counts最后加了iterator
//这里把couns重新包装了一下,返回的是每个类下的向量总和以及数据点的数量
counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator
}.reduceByKey { case ((sum1, count1), (sum2, count2)) => //将相同类下的数据取出来,求和
axpy(1.0, sum2, sum1)
(sum1, count1 + count2)
}.mapValues { case (sum, count) => //对于每个类,这里最终求数据向量总和/数据点总数的结果,也就是新的数据中心点了
scal(1.0 / count, sum)
new VectorWithNorm(sum)
}.collectAsMap()
该方法的全部内容实现如下:
private def runAlgorithm(
data: RDD[VectorWithNorm],
instr: Option[Instrumentation[NewKMeans]]): KMeansModel = {
val sc = data.sparkContext
// 这里获取系统时间来监控程序运行时间,注意不要使用currentTimeMillis ,因为它和系统时间有关,在某些时候容易发生错误,参考 https://github.com/databricks/scala-style-guide/blob/master/README-ZH.md#misc_currentTimeMillis_vs_nanoTime
val initStartTime = System.nanoTime()
//初始化中心点,如果模型是载入之前的模型或者是已经初始化过中心点,那么直接采用,否则的话抽样中心点
//抽样中心点有两种方式,一种是随机初始化,一种是采用分布式K-Means++的方式
//具体可参考论文:https://www.datalearner.com/blog/1051532743231974
val centers = initialModel match {
case Some(kMeansCenters) =>
kMeansCenters.clusterCenters.map(new VectorWithNorm(_))
case None =>
if (initializationMode == KMeans.RANDOM) {
initRandom(data)
} else {
initKMeansParallel(data)
}
}
//统计初始化模型所用的时间
val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.")
var converged = false
var cost = 0.0
var iteration = 0
val iterationStartTime = System.nanoTime()
instr.foreach(_.logNumFeatures(centers.head.vector.size))
// Execute iterations of Lloyd's algorithm until converged
// 执行迭代
while (iteration < maxIterations && !converged) {
val costAccum = sc.doubleAccumulator
val bcCenters = sc.broadcast(centers) //将中心点广播出去,分到各个计算节点上
// Find the new centers
// 在各自的数据分区下,计算分区内数据点与中心点的距离,并更新数据点所属的中心点,采用mapPartitions循环,速度要快于map
val newCenters = data.mapPartitions { points =>
val thisCenters = bcCenters.value
val dims = thisCenters.head.vector.size
val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
val counts = Array.fill(thisCenters.length)(0L)
points.foreach { point =>
val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
costAccum.add(cost) //成本函数累加
val sum = sums(bestCenter) //取出对应中心点的向量和
axpy(1.0, point.vector, sum) //sum += 1 * point.vector
counts(bestCenter) += 1
}
counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator
}.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
axpy(1.0, sum2, sum1)
(sum1, count1 + count2)
}.mapValues { case (sum, count) =>
scal(1.0 / count, sum)
new VectorWithNorm(sum)
}.collectAsMap()
//将旧的中心点毁掉
bcCenters.destroy(blocking = false)
// 更新类的中心点和距离值,如果距离小于指定值那么收敛了,停止迭代,否则还要继续迭代
converged = true
newCenters.foreach { case (j, newCenter) =>
if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) {
converged = false
}
centers(j) = newCenter
}
cost = costAccum.value
iteration += 1
}
//统计迭代的时间
val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9
logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.")
//统计是否达到最大迭代次数
if (iteration == maxIterations) {
logInfo(s"KMeans reached the max number of iterations: $maxIterations.")
} else {
logInfo(s"KMeans converged in $iteration iterations.")
}
//输出cost值
logInfo(s"The cost is $cost.")
//返回模型
new KMeansModel(centers.map(_.vector))
}
在上述循环中,一个核心的方法是计算当前数据点与各个中心点的距离,并返回所属的距离最小的中心点的索引,这个方法的参数有两个,一个是所有的中心点,一个是当前数据点,前者是VectorWithNorm集合,后者是单独的一个VectorWithNorm,返回两个值一个是中心点索引,一个距离。由于计算两个向量的距离需要循环向量中的元素,因此这里用了一个技巧,就是先用一步计算距离的最小值,然后与最优值比较,如果最小值比最优值小,才要计算距离,否则就放弃该点,直接循环,原理参考:https://www.datalearner.com/blog/1051532749735689 :
private[mllib] def findClosest(
centers: TraversableOnce[VectorWithNorm],
point: VectorWithNorm): (Int, Double) = {
var bestDistance = Double.PositiveInfinity //最优距离,冒泡排序用,初始化为最大值
var bestIndex = 0 //最优中心点索引
var i = 0 //中心点循环索引
centers.foreach { center =>
// Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary
// distance computation.
//这里用了一个技巧,就是用lowerBoundOfSqDist表示数据点与当前中心点的距离的最小值,如果该值小于目前的最优值bestDistance才要计算距离,否则就继续循环,计算距离的成本要更高。
var lowerBoundOfSqDist = center.norm - point.norm
lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist
if (lowerBoundOfSqDist < bestDistance) {
val distance: Double = fastSquaredDistance(center, point) //计算欧氏距离
if (distance < bestDistance) { //如果距离小于最优值,那么更新最优值
bestDistance = distance
bestIndex = i
}
}
i += 1
}
(bestIndex, bestDistance)
}
欢迎大家关注DataLearner官方微信,接受最新的AI技术推送
