Spark源码分析之RDD下的KMeans

标签:#scala##spark##大数据# 时间:2018/09/26 15:22:24 作者:小木

在搭建好了Spark开发环境之后,我们就可以编程了。这里介绍Spark自带的MLLib中的KMeans源码,尽管最新的Spark已经逐渐开始使用DataFrame作为数据模型,但是旧的RDD依然适用。下面我们将逐渐讲解KMeans的源码。KMeans主要步骤如下:

1、读取数据
2、对数据进行正规化
3、选择几个初始类中心

首先,我们给出输入的文件kmeans_data.txt,文件包含6行数据,每一行是三个以空格分割的数字:

0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2

然后,我们给出Scala版本调用KMeans的方法,这里我们简单改写了一下官网的解析数据的方式,官网提供的是map,我们改成了mapPartitions(原因参考:https://martin.atlassian.net/wiki/spaces/lestermartin/blog/2016/05/19/67043332/why+spark+s+mapPartitions+transformation+is+faster+than+map+calls+your+function+once+partition+not+once+element )具体如下:

package rdd.ml.clustering

import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by d00454735 on 2018/7/25.
  */
object KMeansTest {

  def main(args: Array[String]): Unit = {

    // 以本地模式运行,并初始化SparkContext
    val masterURL = "local[*]"
    val conf = new SparkConf().setAppName("KMeans Test").setMaster(masterURL)
    val sc = new SparkContext(conf)

    // 载入数据,并将每一行的数据解析,注意,官网给出的解析使用的是map方法,这里改写成了mapPartitions,因为后者是以数据块为单位的方式处理,其效率要远高于map方式(以数据行位单位处理)
    val data = sc.textFile("file:/d:/data/kmeans_data.txt")
    val parsedData = data.mapPartitions(partition => parseData(partition)).cache()

    // 设定KMeans聚类的参数并对模型进行训练,这里定义聚成2类,迭代20次
    val numClusters = 2
    val numIterations = 20
    val clusters = KMeans.train(parsedData, numClusters, numIterations)

    // 计算Within Set Sum of Squared Errors以评估聚类效果
    val WSSSE = clusters.computeCost(parsedData)
    println(s"Within Set Sum of Squared Errors = $WSSSE")

    // 保存模型以及后续加载模型的写法
    clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
    val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")

  }

  //解析数据,每一行先以空额分割成数组,然后将数组内的元素转化成Double形式,最后变成向量
  def parseData(lines : Iterator[String]): Iterator[Vector] = {
    lines.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
  }

}

下面我们进入KMeans的源代码继续分析,首先来到运行的方法,这个方法先用private[spark]声明成属于spark下的私有方法,参数有两个,一个是RDD类型的数据,另一个是可选参数Option[Instrumentation[NewKMeans]],Option在scala里面表示这个参数可能存在可能不存在。Option[A] 是一个类型为 A 的可选值的容器: 如果值存在, Option[A] 就是一个 Some[A] ,如果不存在, Option[A] 就是对象 None (参考:https://windor.gitbooks.io/beginners-guide-to-scala/content/chp5-the-option-type.html )。Instrumentation是用于监控程序运行的类,本质上是java里面的代码。这里面涉及到一个缓存释放的问题,即upersist(),参考:http://bourneli.github.io/scala/spark/2016/06/17/spark-unpersist-after-action.html

在初始化类中心的时候判断是否给定了类中心(参考:https://www.datalearner.com/blog/1051532740001101 ),如果给定了类中心,就用原来的,否则就要随机初始化k个类中心,这里初始化也给了两个方法,一个是所有的随机初始化initRandom(data: RDD[VectorWithNorm]),另一个是并行初始化initKMeansParallel(data: RDD[VectorWithNorm]),它们都是返回一个VectorWithNorm的数组,

如下:

private[spark] def run(
      data: RDD[Vector],
      instr: Option[Instrumentation[NewKMeans]]): KMeansModel = {

    // 如果数据没有做缓存的话会影响性能,这里做了一个警告检查
    if (data.getStorageLevel == StorageLevel.NONE) {
      logWarning("The input data is not directly cached, which may hurt performance if its"
        + " parent RDDs are also uncached.")
    }

    // 计算平方和并缓存,计算方法就是把每一行的元素平方后加和,然后再开方
    val norms = data.map(Vectors.norm(_, 2.0))
    norms.persist()

    //zip是将元素连接起来,这里的含义就是将数据中的每一行和刚才的norm连接
    val zippedData = data.zip(norms).map { case (v, norm) =>
      new VectorWithNorm(v, norm)
    }

    //这里开始真正执行程序了
    val model = runAlgorithm(zippedData, instr)

    //释放缓存数据
    norms.unpersist()

    // Warn at the end of the run as well, for increased visibility.
    if (data.getStorageLevel == StorageLevel.NONE) {
      logWarning("The input data was not directly cached, which may hurt performance if its"
        + " parent RDDs are also uncached.")
    }

    //返回模型
    model
  }

上面就是KMeans算法的执行流程,接下来,我们看核心算法的方法,即runAgorithm(),其参数也是两个,第一个是刚才带来norm的数据,第二个还是个监控参数。

KMeans更新的逻辑是首先对于每个数据点更新其所在的类,即对于每个数据点需要计算这个数据点与类中心的距离,然后选择最近的类中心点作为它所属的类,这样更新了所有的数据点所属的类之后,再计算每个类下数据点的中心点,那么该中心点就是新的类中心了。

i个类别下的中心点的计算公式为:

m_{i} = \frac{1}{n_i} \sum_{j=1}^{j\in i} X_i

这里的n_i表示第i个类下有n_i个数据,X_i表示所有属于类i的数据点对应的向量。因此,KMeans的核心更新代码如下:

//采用mapPartitions的方式循环所有数据点
val newCenters = data.mapPartitions { points =>

    //thisCenters表示当前的类中心,dims表示数据的维度
    val thisCenters = bcCenters.value    
    val dims = thisCenters.head.vector.size

    //这个变量存储类下所有数据点的向量之和,例如更新数据点所属类别后,类1下有两个数据点(1.0, 2.0, 3.0)和(1.0, 1.0, 1.0),那么sum(1) = (2.0, 3.0, 4.0),计算某个类下的中心点用到,所以它的维度是类别数,每个类别下的维度与数据维度一致
    val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))    

    //这个变量是存放每个类下有多少个数据点的
    val counts = Array.fill(thisCenters.length)(0L)        

    //points是当前分区下所有的数据点,我们要计算这些数据点与哪个中心点最近,然后更新一下它所属的类,使用的是KMeans下的findClosest方法,分析见下面。
    points.foreach { point =>

        //寻找当前数据点所属的中心点,cost是距离,bestCenter是表示属于第几个中心点的索引。
        val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)

        costAccum.add(cost)        //成本函数累加
        val sum = sums(bestCenter)    //取出对应中心点的向量和
        axpy(1.0, point.vector, sum)    //把当前数据加上去,sum += 1 * point.vector
        counts(bestCenter) += 1    //该类下的数据点数量加1
    }

    //由于mapPartitions要求返回的对象是Iterator[],因此,这里返回的counts最后加了iterator
    //这里把couns重新包装了一下,返回的是每个类下的向量总和以及数据点的数量
    counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator

    }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>    //将相同类下的数据取出来,求和
        axpy(1.0, sum2, sum1)
        (sum1, count1 + count2)
    }.mapValues { case (sum, count) =>        //对于每个类,这里最终求数据向量总和/数据点总数的结果,也就是新的数据中心点了
        scal(1.0 / count, sum)
        new VectorWithNorm(sum)
    }.collectAsMap()

该方法的全部内容实现如下:

private def runAlgorithm(
      data: RDD[VectorWithNorm],
      instr: Option[Instrumentation[NewKMeans]]): KMeansModel = {

    val sc = data.sparkContext

    // 这里获取系统时间来监控程序运行时间,注意不要使用currentTimeMillis ,因为它和系统时间有关,在某些时候容易发生错误,参考 https://github.com/databricks/scala-style-guide/blob/master/README-ZH.md#misc_currentTimeMillis_vs_nanoTime
    val initStartTime = System.nanoTime()

    //初始化中心点,如果模型是载入之前的模型或者是已经初始化过中心点,那么直接采用,否则的话抽样中心点
    //抽样中心点有两种方式,一种是随机初始化,一种是采用分布式K-Means++的方式
    //具体可参考论文:https://www.datalearner.com/blog/1051532743231974
    val centers = initialModel match {
      case Some(kMeansCenters) =>
        kMeansCenters.clusterCenters.map(new VectorWithNorm(_))
      case None =>
        if (initializationMode == KMeans.RANDOM) {
          initRandom(data)
        } else {
          initKMeansParallel(data)
        }
    }

    //统计初始化模型所用的时间
    val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
    logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.")

    var converged = false
    var cost = 0.0
    var iteration = 0

    val iterationStartTime = System.nanoTime()

    instr.foreach(_.logNumFeatures(centers.head.vector.size))

    // Execute iterations of Lloyd's algorithm until converged
    // 执行迭代
    while (iteration < maxIterations && !converged) {

      val costAccum = sc.doubleAccumulator
      val bcCenters = sc.broadcast(centers)    //将中心点广播出去,分到各个计算节点上

      // Find the new centers
      // 在各自的数据分区下,计算分区内数据点与中心点的距离,并更新数据点所属的中心点,采用mapPartitions循环,速度要快于map
      val newCenters = data.mapPartitions { points =>
        val thisCenters = bcCenters.value
        val dims = thisCenters.head.vector.size

        val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
        val counts = Array.fill(thisCenters.length)(0L)

        points.foreach { point =>
          val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
          costAccum.add(cost)        //成本函数累加
          val sum = sums(bestCenter)    //取出对应中心点的向量和
          axpy(1.0, point.vector, sum)    //sum += 1 * point.vector
          counts(bestCenter) += 1
        }

        counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator

      }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
        axpy(1.0, sum2, sum1)
        (sum1, count1 + count2)
      }.mapValues { case (sum, count) =>
        scal(1.0 / count, sum)
        new VectorWithNorm(sum)
      }.collectAsMap()

      //将旧的中心点毁掉
      bcCenters.destroy(blocking = false)

      // 更新类的中心点和距离值,如果距离小于指定值那么收敛了,停止迭代,否则还要继续迭代
      converged = true
      newCenters.foreach { case (j, newCenter) =>
        if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) {
          converged = false
        }
        centers(j) = newCenter
      }

      cost = costAccum.value
      iteration += 1
    }

    //统计迭代的时间
    val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9
    logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.")

    //统计是否达到最大迭代次数
    if (iteration == maxIterations) {
      logInfo(s"KMeans reached the max number of iterations: $maxIterations.")
    } else {
      logInfo(s"KMeans converged in $iteration iterations.")
    }

    //输出cost值
    logInfo(s"The cost is $cost.")

    //返回模型
    new KMeansModel(centers.map(_.vector))
  }

在上述循环中,一个核心的方法是计算当前数据点与各个中心点的距离,并返回所属的距离最小的中心点的索引,这个方法的参数有两个,一个是所有的中心点,一个是当前数据点,前者是VectorWithNorm集合,后者是单独的一个VectorWithNorm,返回两个值一个是中心点索引,一个距离。由于计算两个向量的距离需要循环向量中的元素,因此这里用了一个技巧,就是先用一步计算距离的最小值,然后与最优值比较,如果最小值比最优值小,才要计算距离,否则就放弃该点,直接循环,原理参考:https://www.datalearner.com/blog/1051532749735689

private[mllib] def findClosest(
      centers: TraversableOnce[VectorWithNorm],
      point: VectorWithNorm): (Int, Double) = {

    var bestDistance = Double.PositiveInfinity    //最优距离,冒泡排序用,初始化为最大值
    var bestIndex = 0    //最优中心点索引
    var i = 0    //中心点循环索引
    centers.foreach { center =>
      // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary
      // distance computation.
      //这里用了一个技巧,就是用lowerBoundOfSqDist表示数据点与当前中心点的距离的最小值,如果该值小于目前的最优值bestDistance才要计算距离,否则就继续循环,计算距离的成本要更高。
      var lowerBoundOfSqDist = center.norm - point.norm
      lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist
      if (lowerBoundOfSqDist < bestDistance) {
        val distance: Double = fastSquaredDistance(center, point)    //计算欧氏距离
        if (distance < bestDistance) {    //如果距离小于最优值,那么更新最优值
          bestDistance = distance
          bestIndex = i
        }
      }
      i += 1
    }
    (bestIndex, bestDistance)
  }
欢迎大家关注DataLearner官方微信,接受最新的AI技术推送