k-medoids聚類算法實現
原文:http://shiyanjun.cn/archives/1398.html
k-medoids聚類算法,即k-中心聚類算法,它是基于k-means聚類算法的改進。我們知道,k-means算法執行過程,首先需要隨機選擇初始質心,只有第一次隨機選擇的初始質心才是實際待聚類點集中的點,而后續將非質心點指派到對應的質心點后,重新計算得到的質心并非是待聚類點集中的點,而且如果某些非質心點是離群點的話,導致重新計算得到的質心可能偏離整個簇,為了解決這個問題,提出了改進的k-medoids聚類算法。
k-medoids聚類算法也是通過劃分的方式來計算得到聚類結果,它使用絕對差值和(Sum of Absolute Differences,SAD)的度量來衡量聚類結果的優劣,在n維歐幾里德空間中,計算SAD的公式如下所示:

圍繞中心點劃分(Partitioning Around Medoids,PAM)的方法是比較常用的,使用PAM方法進行處理,可以指定一個最大迭代次數的參數,在迭代過程中基于貪心策略來選擇使得聚類的質量最高的劃分。使用PAM的方法處理,每次交換一個中心點和非中心點,然后執行將非中心點指派到最近的中心點,計算得到的SAD值越小,則聚類質量越好,如此不斷地迭代,直到找到一個最好的劃分。
維基百科上給出的基于PAM方法計算聚類的過程,描述如下:
- 從待聚類的數據點集中隨機選擇k個點,作為初始中心點;
- 將待聚類的數據點集中的點,指派到最近的中心點;
- 進入迭代,直到聚類的質量滿足指定的閾值(可以通過計算SAD),使總代價減少:
- 對每一個中心點o,對每一個非中心點p,執行如下計算步驟:
- 交換點o和p,重新計算交換后的該劃分所生成的代價值;
- 如果本次交換造成代價增加,則取消交換。 </ol> </ol> </ol>
- 輸入待聚類點集,以及參數k、maxIterations、parallism;
- 同k-means算法一樣,隨機選擇初始中心點集合;
- 啟動parallism個線程,用來將非中心點指派給最近的中心點;
- 開始執行迭代,使得聚類結果對應的劃分的SAD值最小:
- 將非中心點,基于Round-Robin策略,分配給多個線程,并行指派:將非中心點指派給距離其最近的中心點;
- 將多個線程指派的局部結果進行合并,得到一個全局的指派結果;
- 根據指派結果計算SAD值:如果是第一次進行指派,直接計算其SAD值,保存在previousSAD變量中,該變量保存的是最小的SAD值,第一次初始化第一次指派結果計算得到的SAD值;如果不是第一次進行指派,也計算SAD值,將SAD值保存在變量currentSAD中,繼續執行步驟8;
- 隨機選擇一個非中心點;
- 創建一個ClusterHolder對象,該對象保存了該輪迭代指派結果,根據隨機選擇的非中心點修改ClusterHolder對象中的結果,將隨機選擇非中心點和對應的中心點進行交換,為下一輪指派過程準備數據;
- 最后,判斷是否達到指定的最大迭代次數,如果達到則終止計算,處理最終聚類結果,否則執行下一輪迭代計算,轉步驟5。 </ol>
- k:聚類最終想要得到的簇的個數
- maxIterations:因為k-medoids聚類算法的最終目標是最小化SAD的值,所以聚類算法執行迭代的次數越大,最終的結果可能越接近最優,如果是對一個不大的點集進行聚類,可以設置該參數的值大一些
- parallism:每一次迭代過程中,我們都需要將非中心點(Non-medoid Point)指派到最近的中心點,所以將原待聚類點集劃分成多組,有多個處理線程并行處理可能速度會更快,該參數就是并行度
- 并行將非中心點指派到最近的中心點
- 隨機選擇中心點和非中心點
- 創建ClusterHolder對象,交換非中心點和中心點
上面算法描述,應該是按順序的取遍中心點集合中的點,也從非中心點集合中取遍所有非中心點,分別計算生成的新劃分的代價。由于待聚類的點集可大可小,我們可以考慮,每次取點的時候,采用隨機取點的策略,隨機性越強越好,只要滿足最終迭代終止的條件即可。通常,如果能夠迭代所有情況,那么最終得到的劃分一定是最優的劃分,即聚類結果最好,這通常適用于聚類比較小的點的集合。但是如果待聚類的點的集合比較大,則需要通過限制迭代次數來終止迭代計算,從而得到一個能夠滿足實際精度需要的聚類結果。
我們在下面實現k-medoids聚類算法,分別隨機選擇中心點和非中心點,對他們進行交換,通過設置允許最大迭代次數(maxIterations)這個參數值,來使聚類計算最后停止。
聚類算法實現
首先,為了便于理解后面的代碼實現,我們描述一下代碼實現聚類過程的基本步驟,如下所示:
我們實現的k-medoids聚類算法,需要指定2個聚類相關參數,另外一個參數是程序計算并行度,可以通過構造方法看到,代碼如下所示:
public KMedoidsClustering(int k, int maxIterations, int parallism) {
super(k, maxIterations, parallism);
distanceCache = new DistanceCache(Integer.MAX_VALUE);
executorService = Executors.newCachedThreadPool(new NamedThreadFactory("SEEKER"));
latch = new CountDownLatch(parallism);
} 上面代碼中的參數含義如下:
聚類實現的核心代碼如下所示:
@Override
public void clustering() {
// parse sample files
FileUtils.read2DPointsFromFiles(allPoints, "[\t,;\\s]+", inputFiles); // 從文件讀取點數據,加入到集合allPoints中
LOG.info("Total points: count=" + allPoints.size());
ClusterHolder currentHolder = new ClusterHolder(); // 每一次迭代過程中的需要的數據結構都封裝到ClusterHolde對象中
ClusterHolder previousHolder = null;
currentHolder.medoids = initialCentroidsSelectionPolicy.select(k, allPoints); // 隨機選擇初始中心
LOG.info("Initial selected medoids: " + currentHolder.medoids);
// start seeker threads
for (int i = 0; i < parallism; i++) { // 啟動parallism個線程,執行非中心點到中心點的指派
final NearestMedoidSeeker seeker = new NearestMedoidSeeker(seekerQueueSize);
executorService.execute(seeker);
seekers.add(seeker);
}
// /////////////////
// make iterations
// /////////////////
boolean firstTimeToAssign = true;
int numIterations = 0;
double previousSAD = 0.0;
double currentSAD = 0.0;
try {
while(!finallyCompleted) {
try {
LOG.debug("Current medoid set: " + currentHolder.medoids);
if(firstTimeToAssign) {
// 第一次處理時,只是根據隨機選擇的初始中心集合,和全部點的集合,指派給多個線程處理
assignNearestMedoids(currentHolder, true);
firstTimeToAssign = false;
} else {
// 非第一次處理時,每次迭代得到的聚類結果,都是基于中心點進行分組的,處理邏輯稍微不同
assignNearestMedoids(currentHolder, false);
}
// merge result
mergeMedoidAssignedResult(currentHolder); // 每個線程處理一部分,最后要合并多個線程分別處理的結果
LOG.debug("Merged result: " + currentHolder.medoidWithNearestPointSet);
// compare cost for 2 iterations, we use SAD (sum of absolute differences)
if(previousSAD == 0.0) {
// first time compute SAD
previousSAD = currentSAD;
currentSAD = computeSAD(currentHolder); // 第一次計算SAD
} else {
RandomPoint randomPoint = selectNonCenterPointRandomly(currentHolder); // 隨機選擇一個非中心點
LOG.debug("Randomly selected: " + randomPoint);
// compute current cost when using random point to substitute for the medoid
currentSAD = computeSAD(currentHolder); // // 計算用隨機選擇非中心點替換一個中心點得到的SAD值
// compare SADs
if(currentSAD - previousSAD < 0.0) { // 如果此次迭代得到的SAD值,比上次迭代計算得到SAD小,替換previousHolder和previousSAD,以保證最終算法終止后,該最小值對應的劃分能夠保留下來
previousHolder = currentHolder;
previousSAD = currentSAD;
}
// construct new cluster holder
currentHolder = constructNewHolder(currentHolder, randomPoint); // 根據隨機選擇的中心點,創建一個新的 ClusterHolde對象,用于下次迭代
}
LOG.info("Iteration #" + (++numIterations) + ": previousSAD=" + previousSAD + ", currentSAD=" + currentSAD);
if(numIterations > maxIterations) { // 如果達到指定的最大迭代次數,則終止
finallyCompleted = true;
}
} catch(Exception e) {
Throwables.propagate(e);
} finally {
try {
if(!finallyCompleted) {
latch = new CountDownLatch(parallism);
completeToAssignTask = false;
}
Thread.sleep(10);
synchronized(signalLock) {
signalLock.notifyAll();
}
} catch (InterruptedException e) {}
}
}
} finally {
LOG.info("Shutdown executor service: " + executorService);
executorService.shutdown();
}
// finally result
centerPointSet.addAll(previousHolder.medoids); // 處理最終的聚類結果
Iterator<Entry<CenterPoint, List<Point2D>>> iter = previousHolder.medoidWithNearestPointSet.entrySet().iterator();
while(iter.hasNext()) {
Entry<CenterPoint, List<Point2D>> entry = iter.next();
int clusterId = entry.getKey().getId();
Set<ClusterPoint<Point2D>> set = Sets.newHashSet();
for(Point2D p : entry.getValue()) {
set.add(new ClusterPoint2D(p, clusterId));
}
clusteredPoints.put(clusterId, set);
}
} 通過上面代碼及其注釋,我們可以了解到聚類實現的基本處理流程。首先,看一下工具類ClusterHolder和RandomPoint:
private class ClusterHolder {
/** snapshot of clustering result: medoids of clustering result, as well as non-medoid points */
private TreeMap<CenterPoint, List<Point2D>> medoidWithNearestPointSet;
/** center point set represented by Point2D */
private Set<Point2D> centerPoints;
/** center point set represented by CenterPoint */
private TreeSet<CenterPoint> medoids;
public ClusterHolder() {
super();
}
}
private class RandomPoint {
/** medoid which the random point belongs to */
private final CenterPoint medoid; // 隨機選擇的中心點
/** a non-medoid point selected randomly */
private final Point2D point; // 隨機選擇的非中心點,該點被指派給上面的中心點medoid
public RandomPoint(CenterPoint medoid, Point2D point) {
super();
this.medoid = medoid;
this.point = point;
}
@Override
public String toString() {
return "RandomPoint[medoid=" + medoid + ", point=" + point + "]";
}
} 上面2個類,能夠在迭代處理過程中,方便地保存當前迭代處理的數據狀態。下面我們看一下,上面代碼調用的比較重要的方法的實現邏輯。
將非中心點指派到最近的中心點的計算,是調用assignNearestMedoids方法,該方法的代碼實現,如下所示:
private void assignNearestMedoids(final ClusterHolder holder, boolean firstTimeToAssign) {
LOG.debug("firstTimeToAssign=" + firstTimeToAssign);
try {
// assign tasks to seeker threads
if(firstTimeToAssign) { // 第一次進行指派,因為還沒有進行指派過,所以只有隨機選擇的一組中心點,和全部待聚類的點的集合
holder.centerPoints = Sets.newHashSet();
for(CenterPoint medoid : holder.medoids) {
holder.centerPoints.add(medoid.toPoint()); // 構造ClusterHolder對象,將中心點加入到集合中
}
LOG.debug("holder.centerPoints: " + holder.centerPoints);
for(Point2D p : allPoints) { // 對全部待聚類的點作為任務,加入到每個線程的隊列中,但是要排除已經被選擇為中心點的點
LOG.debug("Assign point: " + p);
if(!holder.centerPoints.contains(p)) {
selectSeeker().q.put(new Task(holder.medoids, p));
}
}
} else {
for(List<Point2D> points : holder.medoidWithNearestPointSet.values()) { // 如果筆試第一次進行指派,已經在構造ClusterHolder對象的時候,將隨機選擇的中心點和非中心點進行了交換,這里直接進行指派即可
for(Point2D p : points) {
selectSeeker().q.put(new Task(holder.medoids, p));
}
}
}
} catch(Exception e) {
Throwables.propagate(e);
} finally {
try {
completeToAssignTask = true;
latch.await();
} catch (InterruptedException e) { }
}
} 上面代碼調用selectSeeker()方法,獲取到一個NearestMedoidSeeker線程,將待指派的點加入到其隊列中,然后由該線程去異步循環處理。selectSeeker()方法實現代碼,如下所示:
private NearestMedoidSeeker selectSeeker() {
int index = taskIndex++ % parallism;
return seekers.get(index);
} 下面,我們看一下NearestMedoidSeeker線程的實現,它也比較簡單,實現了從隊列q中將非中心點取出,計算到該點最近的中心點,然后指派給該中心點,線程實現代碼如下所示:
private class NearestMedoidSeeker implements Runnable {
private final Log LOG = LogFactory.getLog(NearestMedoidSeeker.class);
private final BlockingQueue<Task> q;
private Map<CenterPoint, List<Point2D>> clusteringNearestPoints = Maps.newHashMap();
private int processedTasks = 0;
public NearestMedoidSeeker(int qsize) {
q = new LinkedBlockingQueue<Task>(qsize);
}
@Override
public void run() {
while(!finallyCompleted) { // 每一輪迭代,調用一次assign方法
try {
assign();
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void assign() throws InterruptedException {
try {
LOG.debug("Q size: " + q.size());
while(!(q.isEmpty() && completeToAssignTask)) {
processedTasks++;
final Task task = q.poll();
if(task != null) {
final Point2D p1 = task.point;
double minDistance = Double.MAX_VALUE;
CenterPoint nearestMedoid = null;
for(CenterPoint medoid : task.medoids) {
final Point2D p2 = medoid.toPoint();
Double distance = distanceCache.computeDistance(p1, p2); // 計算非中心點p1到中心點p2的歐幾里德距離
if(distance < minDistance) {
minDistance = distance;
nearestMedoid = medoid;
}
}
LOG.debug("Nearest medoid seeked: point=" + p1 + ", medoid=" + nearestMedoid);
List<Point2D> points = clusteringNearestPoints.get(nearestMedoid);
if(points == null) {
points = Lists.newArrayList();
clusteringNearestPoints.put(nearestMedoid, points);
}
points.add(p1); // 將非中心點p1,指派給到中心點的歐幾里德距離最近的點
} else {
Thread.sleep(150);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
LOG.debug("Point processed: processedTasks=" + processedTasks);
synchronized(signalLock) {
signalLock.wait();
}
clusteringNearestPoints = Maps.newHashMap();
processedTasks = 0;
}
}
} 每一輪指派,多個線程都計算得到一個非中心點指派到最近中心點的子集,最后還要將這些子集合并為一個全局的指派結果,即得到距離每個中心點最近的非中心點的集合,合并的實現在mergeMedoidAssignedResult()方法中,代碼如下所示:
private void mergeMedoidAssignedResult(ClusterHolder currentHolder) {
currentHolder.medoidWithNearestPointSet = Maps.newTreeMap();
for(NearestMedoidSeeker seeker : seekers) {
LOG.debug("seeker.clusteringNearestPoints: " + seeker.clusteringNearestPoints);
Iterator<Entry<CenterPoint, List<Point2D>>> iter = seeker.clusteringNearestPoints.entrySet().iterator();
while(iter.hasNext()) {
Entry<CenterPoint, List<Point2D>> entry = iter.next();
List<Point2D> set = currentHolder.medoidWithNearestPointSet.get(entry.getKey());
if(set == null) {
set = Lists.newArrayList();
currentHolder.medoidWithNearestPointSet.put(entry.getKey(), set);
}
set.addAll(entry.getValue());
}
}
} 合并后的指派結果,都存放在ClusterHolder對象中,為下一輪迭代準備了數據。
隨機選擇一個中心點和非中心點,實現代碼如下所示:
private RandomPoint selectNonCenterPointRandomly(ClusterHolder holder) {
List<CenterPoint> medoids = new ArrayList<CenterPoint>(holder.medoidWithNearestPointSet.keySet());
CenterPoint selectedMedoid = medoids.get(random.nextInt(medoids.size())); // 隨機選擇一個中心點
List<Point2D> belongingPoints = holder.medoidWithNearestPointSet.get(selectedMedoid);
Point2D point = belongingPoints.get(random.nextInt(belongingPoints.size())); // 隨機選擇一個非中心點
return new RandomPoint(selectedMedoid, point); // 返回這2個點
} 因為每一次迭代,我們都得到一個非中心點指派到最近的中心點的聚類結果集合,所以在設計隨機選擇中心點和非中心點進行交換時,我們首先從中心點集合中選擇一個中心點,然后再從該中心點對應的非中心點的簇的集合中選擇一個非中心點,當然也可以考慮用其他的方法,比如,在一次迭代過程中,待交換的中心點和非中心點不在同一個簇中。
我們處理的策略是,事后處理,也就是每次先實現非中心點和中心點的交換,再進行指派,計算SAD值,如果此輪迭代得到的SAD值比上一輪的大,則直接丟棄結果,將上一輪的指派結果作為最終候選結果,直到最后,保留著具有最小SAD值的指派結果。
交換中心點和非中心點,我們創建了一個ClusterHolder對象,然后在該對象所持有的集合上進行修改賈環。交換非中心點和中心點的實現代碼,如下所示:
private ClusterHolder constructNewHolder(final ClusterHolder holder, RandomPoint randomPoint) {
ClusterHolder newHolder = new ClusterHolder();
// collect center points with type Point2D for a holder object
// from previous result of clustering procedure
newHolder.centerPoints = Sets.newHashSet();
for(CenterPoint c : holder.medoidWithNearestPointSet.keySet()) {
newHolder.centerPoints.add(c.toPoint());
}
Point2D newPoint = randomPoint.point;
CenterPoint oldMedoid = randomPoint.medoid;
// create a new center point with type CenterPoint based on the randomly selected non-medoid point
// and it's id is equal to the old medoid's
CenterPoint newMedoid = new CenterPoint(oldMedoid.getId(), newPoint);
// use new medoid above to substitute the old medoid
newHolder.centerPoints.remove(oldMedoid.toPoint());
newHolder.centerPoints.add(newPoint);
newHolder.medoids = Sets.newTreeSet();
newHolder.medoids.addAll(holder.medoidWithNearestPointSet.keySet());
newHolder.medoids.remove(oldMedoid); // remove old medoid from center point set of new holder object
newHolder.medoids.add(newMedoid);
// copy the holder's medoidWithNearestPointSet, and modify it
newHolder.medoidWithNearestPointSet = Maps.newTreeMap();
newHolder.medoidWithNearestPointSet.putAll(holder.medoidWithNearestPointSet);
List<Point2D> oldPoints = newHolder.medoidWithNearestPointSet.get(oldMedoid);
oldPoints.remove(newPoint); // remove new randomly selected non-medoid point from previous result set of clustering
oldPoints.add(oldMedoid.toPoint()); // add old medoid point to the non-medoid set
newHolder.medoidWithNearestPointSet.put(newMedoid, oldPoints);
return newHolder;
} 為了保留上一次迭代指派的結果,這里不要修改holder對應的結果的集合(holder是本次迭代得到的聚類結果),而是拷貝出一份,在拷貝的結果上交換中心點和非中心點。
聚類效果對比
我們分別使用k-medoids算法與k-means算法對同一個點集進行聚類,分別對結果進行比較。其中,k-means算法由于隨機選擇初始質心,每次執行聚類結果不同;而k-medoids算法的聚類結果質量依賴于迭代次數,所以我們選擇不同的迭代次數:取值maxIterations分別為300、1000、3000時,對比效果,如下圖所示:

上圖中,第一排3個圖是k-means聚類得到的3個結果,第二排是k-medoids聚類得到的結果。通過上圖可以看出,使用k-medoids聚類算法,當maxIterations越大的時候,可能更加靠近最優解,聚類結果的質量越高,此時對應的質量的度量SAD的值就越小。
參考鏈接