public class SquaredEuclideanSilhouette
extends Object
The Silhouette for each point i
is defined as:
$$ s_{i} = \frac{b_{i}a_{i}}{max\{a_{i},b_{i}\}} $$
which can be rewritten as
$$ s_{i}= \begin{cases} 1\frac{a_{i}}{b_{i}} & \text{if } a_{i} \leq b_{i} \\ \frac{b_{i}}{a_{i}}1 & \text{if } a_{i} \gt b_{i} \end{cases} $$
where $a_{i}$
is the average dissimilarity of i
with all other data
within the same cluster, $b_{i}$
is the lowest average dissimilarity
of i
to any other cluster, of which i
is not a member.
$a_{i}$
can be interpreted as how well i
is assigned to its cluster
(the smaller the value, the better the assignment), while $b_{i}$
is
a measure of how well i
has not been assigned to its "neighboring cluster",
i.e. the nearest cluster to i
.
Unfortunately, the naive implementation of the algorithm requires to compute
the distance of each couple of points in the dataset. Since the computation of
the distance measure takes D
operations  if D
is the number of dimensions
of each point, the computational complexity of the algorithm is O(N^2^*D)
, where
N
is the cardinality of the dataset. Of course this is not scalable in N
,
which is the critical number in a Big Data context.
The algorithm which is implemented in this object, instead, is an efficient and parallel implementation of the Silhouette using the squared Euclidean distance measure.
With this assumption, the total distance of the point X
to the points $C_{i}$
belonging to the cluster $\Gamma$
is:
$$ \sum\limits_{i=1}^N d(X, C_{i} ) = \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D (x_{j}c_{ij})^2 \Big) = \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D x_{j}^2 + \sum\limits_{j=1}^D c_{ij}^2 2\sum\limits_{j=1}^D x_{j}c_{ij} \Big) = \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 + \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2 2 \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}c_{ij} $$
where $x_{j}$
is the j
th dimension of the point X
and
$c_{ij}$
is the j
th dimension of the i
th point in cluster $\Gamma$
.
Then, the first term of the equation can be rewritten as:
$$ \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 = N \xi_{X} \text{ , with } \xi_{X} = \sum\limits_{j=1}^D x_{j}^2 $$
where $\xi_{X}$
is fixed for each point and it can be precomputed.
Moreover, the second term is fixed for each cluster too,
thus we can name it $\Psi_{\Gamma}$
$$ \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2 = \sum\limits_{i=1}^N \xi_{C_{i}} = \Psi_{\Gamma} $$
Last, the third element becomes
$$ \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}c_{ij} = \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{j} $$
thus defining the vector
$$ Y_{\Gamma}:Y_{\Gamma j} = \sum\limits_{i=1}^N c_{ij} , j=0, ..., D $$
which is fixed for each cluster $\Gamma$
, we have
$$ \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{j} = \sum\limits_{j=1}^D Y_{\Gamma j} x_{j} $$
In this way, the previous equation becomes
$$ N\xi_{X} + \Psi_{\Gamma}  2 \sum\limits_{j=1}^D Y_{\Gamma j} x_{j} $$
and the average distance of a point to a cluster can be computed as
$$ \frac{\sum\limits_{i=1}^N d(X, C_{i} )}{N} = \frac{N\xi_{X} + \Psi_{\Gamma}  2 \sum\limits_{j=1}^D Y_{\Gamma j} x_{j}}{N} = \xi_{X} + \frac{\Psi_{\Gamma} }{N}  2 \frac{\sum\limits_{j=1}^D Y_{\Gamma j} x_{j}}{N} $$
Thus, it is enough to precompute: the constant $\xi_{X}$
for each point X
; the
constants $\Psi_{\Gamma}$
, N
and the vector $Y_{\Gamma}$
for
each cluster $\Gamma$
.
In the implementation, the precomputed values for the clusters are distributed among the worker nodes via broadcasted variables, because we can assume that the clusters are limited in number and anyway they are much fewer than the points.
The main strengths of this algorithm are the low computational complexity
and the intrinsic parallelism. The precomputed information for each point
and for each cluster can be computed with a computational complexity
which is O(N/W)
, where N
is the number of points in the dataset and
W
is the number of worker nodes. After that, every point can be
analyzed independently of the others.
For every point we need to compute the average distance to all the clusters.
Since the formula above requires O(D)
operations, this phase has a
computational complexity which is O(C*D*N/W)
where C
is the number of
clusters (which we assume quite low), D
is the number of dimensions,
N
is the number of points in the dataset and W
is the number
of worker nodes.
Modifier and Type  Class and Description 

static class 
SquaredEuclideanSilhouette.ClusterStats 
static class 
SquaredEuclideanSilhouette.ClusterStats$ 
Constructor and Description 

SquaredEuclideanSilhouette() 
Modifier and Type  Method and Description 

static scala.collection.immutable.Map<Object,SquaredEuclideanSilhouette.ClusterStats> 
computeClusterStats(Dataset<Row> df,
String predictionCol,
String featuresCol,
String weightCol)
The method takes the input dataset and computes the aggregated values
about a cluster which are needed by the algorithm.

static double 
computeSilhouetteCoefficient(Broadcast<scala.collection.immutable.Map<Object,SquaredEuclideanSilhouette.ClusterStats>> broadcastedClustersMap,
Vector point,
double clusterId,
double weight,
double squaredNorm)
It computes the Silhouette coefficient for a point.

static double 
computeSilhouetteScore(Dataset<?> dataset,
String predictionCol,
String featuresCol,
String weightCol)
Compute the Silhouette score of the dataset using squared Euclidean distance measure.

static double 
overallScore(Dataset<Row> df,
Column scoreColumn,
Column weightColumn) 
static double 
pointSilhouetteCoefficient(scala.collection.immutable.Set<Object> clusterIds,
double pointClusterId,
double weightSum,
double weight,
scala.Function1<Object,Object> averageDistanceToCluster) 
static void 
registerKryoClasses(SparkContext sc)
This method registers the class
SquaredEuclideanSilhouette.ClusterStats
for kryo serialization. 
public static void registerKryoClasses(SparkContext sc)
SquaredEuclideanSilhouette.ClusterStats
for kryo serialization.
sc
 SparkContext
to be usedpublic static scala.collection.immutable.Map<Object,SquaredEuclideanSilhouette.ClusterStats> computeClusterStats(Dataset<Row> df, String predictionCol, String featuresCol, String weightCol)
df
 The DataFrame which contains the input datapredictionCol
 The name of the column which contains the predicted cluster id
for the point.featuresCol
 The name of the column which contains the feature vector of the point.weightCol
 The name of the column which contains the instance weight.Map
which associates each cluster id
to a SquaredEuclideanSilhouette.ClusterStats
object (which contains the precomputed values N
,
$\Psi_{\Gamma}$
and $Y_{\Gamma}$
for a cluster).public static double computeSilhouetteCoefficient(Broadcast<scala.collection.immutable.Map<Object,SquaredEuclideanSilhouette.ClusterStats>> broadcastedClustersMap, Vector point, double clusterId, double weight, double squaredNorm)
broadcastedClustersMap
 A map of the precomputed values for each cluster.point
 The Vector
representing the current point.clusterId
 The id of the cluster the current point belongs to.weight
 The instance weight of the current point.squaredNorm
 The $\Xi_{X}$
(which is the squared norm) precomputed for the point.public static double computeSilhouetteScore(Dataset<?> dataset, String predictionCol, String featuresCol, String weightCol)
dataset
 The input dataset (previously clustered) on which compute the Silhouette.predictionCol
 The name of the column which contains the predicted cluster id
for the point.featuresCol
 The name of the column which contains the feature vector of the point.weightCol
 The name of the column which contains instance weight.public static double pointSilhouetteCoefficient(scala.collection.immutable.Set<Object> clusterIds, double pointClusterId, double weightSum, double weight, scala.Function1<Object,Object> averageDistanceToCluster)