Spark-K均值聚类

聚类

最有名的非监督学习的算法,试图找到数据中的自然数组,一群互相相似而与其他点不同的数据点往往代码某种意义的一个簇群,聚类算法就是把这些相似的数据划分到同一簇群中。

K均值聚类该簇

试图在数据集中找出K个簇群,关键点位如何选择合适的K值,K均值算法中数据点相互距离一般采用欧式距离。计算欧式要求数据点特征为数值型,簇值的中心为质心,是簇群中所有的点的算术平均值。算法开始时选择一些数据点为簇群的质心,然后把每个数据点分配给最近的质心,接着对每个簇计算该簇的所有数据点的平均值,并见那个其作为该簇的新质心,不断重复该过程。

实例测试

准备数据集

KDD CUP 1999数据集 链接

1
2
0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.
包含38个特征,最后为目标值

统计样本个数

1
2
3
4
5
6
7
8
9
10
val conf=new SparkConf().setMaster("local").setAppName("K-MEANS")
val sc=new SparkContext(conf)
val rawData=sc.textFile("E:\\mllib\\kddcup.data\\kddcup.data_10_percent_corrected")
val data=rawData.map {_.split(",").last}.countByValue().toSeq.sortBy(_._2).reverse //默认为降序排序 reverse为升序
输出
(smurf.,2807886)
(neptune.,1072017)
(normal.,972781)
(satan.,15892)
(ipsweep.,12481)

处理数据

K均值聚类算法要求特征为数据型,我们先简单的忽略非数值列。
1.删除下标从1开始的三个类别型列和最后标号列
2.保留其他值并将其转换为一个数值型数组
3.把数组和标号组织为一个元组

1
2
3
4
5
6
7
8
9
val labelsAndData=rawData.map{
line =>
val buffer=line.split(",").toBuffer
buffer.remove(1,3)
val label=buffer.remove(buffer.length-1)
val vector=Vectors.dense(buffer.map ( _.toDouble ).toArray)
(label,vector)

}

缓存数据

1
val new_data=labelsAndData.values.cache()

Spark聚类操作

1
2
3
4
5
6
7
val kmeans=new KMeans()
val model=kmeans.run(new_data)
model.clusterCenters.foreach(println)

输出
[47.979395571029514,1622.078830816566,868.5341828266062,4.453261001578883E-5,0.006432937937735314,1.4169466823205539E-5,0.03451682118132869,1.5181571596291647E-4,0.14824703453301485,0.01021213716043885,1.1133152503947209E-4,3.6435771831099954E-5,0.011351767134933808,0.0010829521072021374,1.0930731549329986E-4,0.0010080563539937655,0.0,0.0,0.0013865835391279706,332.2862475203433,292.9071434354884,0.1766854175944295,0.1766078094004292,0.05743309987449898,0.05771839196793656,0.7915488441762849,0.020981640419416685,0.028996862475203982,232.4707319541719,188.6660459090725,0.7537812031901855,0.030905611108874582,0.6019355289259479,0.0066835148374550625,0.17675395732965873,0.17644162179668482,0.05811762681672762,0.05741111695882669]
[2.0,6.9337564E8,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,57.0,3.0,0.79,0.67,0.21,0.33,0.05,0.39,0.0,255.0,3.0,0.01,0.09,0.22,0.0,0.18,0.67,0.05,0.33]

程序输出俩个向量,代表K均值把数据聚类成2个簇。
K均值算法中,簇群其实就是一个点,即组成该簇的所有点的中心,数据点其实就是由所有数值型特征组成的特征向量。

直观化输出

对每个簇中每个标号出现的次数进行计数,为每个数据点分配一个簇,然后对簇-类别进行计数,然后可读的方式输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 val clusterlabelCount=labelsAndData.map{
case(label,datum)=>
val cluster=model.predict(datum)
(cluster,label)
}.countByValue

clusterlabelCount.toSeq.sorted.foreach{

case((cluster,label),count)=>
println(cluster+" "+label+" "+count)


}


输出:
0 back. 2203
0 buffer_overflow. 30
0 ftp_write. 8
0 guess_passwd. 53
0 imap. 12
0 ipsweep. 1247
0 land. 21
0 loadmodule. 9
0 multihop. 7
0 neptune. 107201
0 nmap. 231
0 normal. 97278
0 perl. 3
0 phf. 4
0 pod. 264
0 portsweep. 1039
0 rootkit. 10
0 satan. 1589
0 smurf. 280790
0 spy. 2
0 teardrop. 979
0 warezclient. 1020
0 warezmaster. 20
1 portsweep. 1

选择合适的K值

定义欧式距离和数据点到最近簇质心的距离的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def disttoCentroid(datum:Vector ,model :KMeansModel)={

val cluster=model.predict(datum)
val centroid=model.clusterCenters(cluster) //最近簇的质心
distance(centroid,datum)

}

def clusteringScore(data:RDD[Vector],k:Int)={

val kmeans=new KMeans()
kmeans.setK(k)
val model=kmeans.run(data)
data.map(datum=>disttoCentroid(datum,model)).mean()

}

对K进行评价
(5 to 40 by 5).map(k=>(k,clusteringScore(new_data,k))).foreach(println)
输出:
(5,1779.3473960726278)
(10,1045.7698346483824)
(15,888.4541349979995)
(20,587.4098823876957)
(25,691.0673141997573)
(30,670.5355237087188)
(35,294.8166774400612)
(40,278.0287120232854)

kmeans.setRuns() //设置给定K值得运行次数
kmeans.setEpsilon() //控制聚类过程中簇质心移动的距离的最小值

特征的标准化

每个特征值-平均值/标准差

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
val dataAsArray=new_data.map { _.toArray }
val numcols=dataAsArray.first().length //数组长度
val n=dataAsArray.count() //总记录数
val sums=dataAsArray.reduce((a,b)=>a.zip(b).map(t=>t._1+t._2)) //向量之和
val means=sums.map { _/n } //各个维度的平均值

val sumSquares=dataAsArray.fold(new Array[Double](numcols))((a,b)=>a.zip(b).map(t=>t._1+t._2*t._2)) //平方和返回一个数组

val stdevs=sumSquares.zip(sums).map{ //标准差
case(sumSq,sum)=>math.sqrt(n*sumSq-sum*sum)/n


}

//标准化函数
def normalize(data:Vector)={
val normalizedArray=(data.toArray,means,stdevs).zipped.map(
(value,mean,stdev)=>
if(stdev<=0) (value-mean) else (value-mean)/stdev

)

Vectors.dense(normalizedArray)
}


val normalizedData=new_data.map (normalize).cache() //数据标准化

(60 to 120 by 10).par.map(k=>(k,clusteringScore(normalizedData,k))).toList.foreach(println)