Homework 03: K-means and Principle Component Analysis#
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()
import numpy as np
import pandas as pd
import os.path
import subprocess
from sklearn import cluster, decomposition
Helpers for Getting, Loading and Locating Data
def wget_data(url: str):
local_path = './tmp_data'
p = subprocess.Popen(["wget", "-nc", "-P", local_path, url], stderr=subprocess.PIPE, encoding='UTF-8')
rc = None
while rc is None:
line = p.stderr.readline().strip('\n')
if len(line) > 0:
print(line)
rc = p.poll()
def locate_data(name, check_exists=True):
local_path='./tmp_data'
path = os.path.join(local_path, name)
if check_exists and not os.path.exists(path):
raise RuxntimeError('No such data file: {}'.format(path))
return path
wget_data('https://raw.githubusercontent.com/illinois-ipaml/MachineLearningForPhysics/main/data/spectra_data.hf5')
spectra_data = pd.read_hdf(locate_data('spectra_data.hf5'))
Problem 1#
The PCA method of dimensionality reduction first calculates an exact linear decomposition (up to round off error), then trims rows and columns to the desired number of latent variables. In this problem, you will explore how PCA is implemented. The tricky linear algebra is already implemented in numpy.linalg, but it is still a challenge to keep all the notation and conventions self consistent.
The input data \(X\) is provided as an \(N\times D\) (samples, features) matrix. In the following we assume that each feature is centered on zero (otherwise, calculate and subtract the \(\mu_j\), then add them back later), $\( \mu_j = \sum_i X_{ij} = 0 \;. \)$ There are three equivalent methods for performing the initial exact decomposition:
Calculate the \(D\times D\) sample covariance matrix $\( C \equiv \frac{1}{N-1}\,X^T X \;. \)\( then find its eigenvalue decomposition: \)\( C = Q^T \Lambda Q \)\( where \)\Lambda\( is a diagonal \)D\times D\( matrix of eigenvalues and the rows of the orthogonal \)D\times D\( matrix \)Q$ are the corresponding eigenvectors.
Calculate the \(N\times N\) matrix of dot products between samples: $\( D \equiv \frac{1}{N-1}\,X X^T \;, \)\( then find its eigenvalue decomposition, where now \)Q\( and \)\Lambda\( are \)N\times N$ matrices.
Find the singular value decomposition (SVD) of \(X\) $\( X = U S V \quad \Rightarrow \quad C = \frac{1}{N-1}\,V^T S^2 V \;, \)\( where \)S\( is a diagonal \)K\times K\( matrix of *singular values*, \)U\( is \)N\times K\( and \)V\( is \)K\times D\(, with \)K = \min(N, D)$. The notation above is chosen to match np.linalg.svd which you will use below.
The computational cost of each method depends differently on the values of \(N\) and \(D\), so the most efficient method will depend on the shape of the input data \(X\). There are also numerical considerations: the matrices \(C\) and \(D\) should be positive definite in order to guarantee positive eigenvalues, but this will not be true for \(C\) if \(N < D\) or for \(D\) if \(N > D\).
Implement the function below to calculate the eigenvectors and eigenvalues of a square input matrix using a suitable function from np.linalg. The results should be sorted in order of decreasing eigenvalues, as needed by PCA. Hint: M[::-1]
reverses the rows of a 2D array M
(more info here).
def eigensolve(M):
"""Calculate eigenvectors and eigenvalues of a square symmetric matrix.
Results are sorted by decreasing eigenvalue. The rows (not columns) of the
returned eigenvector array are the normalized eigenvectors of M.
Parameters
----------
M : 2D array
N x N symmetric square matrix to use.
Returns
-------
tuple
Tuple of arrays (eigenvalues, eigenvectors) with eigenvalues decreasing and
eigenvector[i] corresponding to eigenvalue[i]. Eigenvalues should have the
shape (N,) and eigenvectors should have the shape (N, N).
"""
assert len(M.shape) == 2
nrows, ncols = M.shape
assert nrows == ncols
assert np.all(M.T == M)
# YOUR CODE HERE
raise NotImplementedError()
#A function to check work
def checkEigens(evals: np.ndarray, evecs: np.ndarray, covariance: np.ndarray, knownEvals: np.ndarray, knownEvecs: np.ndarray):
assert np.allclose(covariance, evecs.T.dot(np.diag(evals).dot(evecs)))
assert np.allclose(
np.round(evals, 5),
knownEvals)
assert np.allclose(
np.round(np.abs(evecs), 3),
np.abs(knownEvecs)
)
#Accounting for direction
for calcRow, knownRow in zip(np.round(evecs, 3), knownEvecs):
assert np.allclose(calcRow, knownRow) or np.allclose(-1 * calcRow, knownRow)
# A correct solution should pass the tests below.
C = np.diag(np.arange(1., 5.))
evals, evecs = eigensolve(C)
checkEigens(evals, evecs, C,
knownEvals=[4, 3, 2, 1],
knownEvecs=[[ 0., 0., 0., 1.],
[ 0., 0., 1., 0.],
[ 0., 1., 0., 0.],
[ 1., 0., 0., 0.]]
)
gen = np.random.RandomState(seed=123)
N, D = 4, 3
X = gen.uniform(size=(N, D))
X -= np.mean(X, axis=0)
C = np.dot(X.T, X) / (N - 1)
evals, evecs = eigensolve(C)
checkEigens(evals, evecs, C,
knownEvals=[ 0.08825, 0.0481 , 0.01983],
knownEvecs=[[-0.787, -0.477, 0.391],
[-0.117, 0.737, 0.665],
[-0.606, 0.478, -0.636]]
)
Implement the function below to calculate the same quantities using the SVD method directly on \(X\) instead of solving the eigenvalue problem for the sample covariance. Hint: pay attention to the full_matrices
parameter.
def svdsolve(X):
"""Calculate eigenvectors and eigenvalues of the sample covariance of X.
Results are sorted by decreasing eigenvalue. The rows (not columns) of the
returned eigenvector array are the normalized eigenvectors of M.
Uses the SVD method directly on X.
Parameters
----------
X: 2D array
N x D matrix to use.
Returns
-------
tuple
Tuple of arrays (eigenvalues, eigenvectors) with eigenvalues decreasing and
eigenvector[i] corresponding to eigenvalue[i]. Eigenvalues should have the
shape (K,) and eigenvectors should have the shape (K, D) with K=min(N, D).
"""
assert len(X.shape) == 2
N, D = X.shape
# YOUR CODE HERE
raise NotImplementedError()
# A correct solution should pass the tests below.
gen = np.random.RandomState(seed=123)
N, D = 4, 3
X = gen.uniform(size=(N, D))
X -= np.mean(X, axis=0)
evals, evecs = svdsolve(X)
C = np.dot(X.T, X) / (N - 1)
checkEigens(evals, evecs, C,
knownEvals= [ 0.08825, 0.0481 , 0.01983],
knownEvecs= [[-0.787, -0.477, 0.391],
[ 0.117, -0.737, -0.665],
[-0.606, 0.478, -0.636]]
)
N, D = 3, 4
X = gen.uniform(size=(N, D))
X -= np.mean(X, axis=0)
evals, evecs = svdsolve(X)
C = np.dot(X.T, X) / (N - 1)
checkEigens(evals, evecs, C,
knownEvals= [ 0.23688, 0.03412, 0. ],
knownEvecs= [[ 0.368, 0.874, 0.316, -0.041],
[-0.752, 0.178, 0.313, -0.553],
[-0.516, 0.422, -0.496, 0.556]]
)
Note that the eigenvectors found by the two methods might differ by an overall sign, but both sets of eigenvectors are orthonormal, so equally valid.
The following simple driver code shows how to build a PCA fit from your functions (but the sklearn driver has a lot more options):
def PCA_fit(data, n_components=2):
X = data.values
N, D = X.shape
print('N,D = {},{}'.format(N, D))
K = min(N, D)
assert n_components <= K
# Subtract mean value of each feature.
mu = np.mean(X, axis=0)
Xc = X - mu
# Select the method based on N, D.
if N > 2 * D:
print('Using method 1')
evals, M = eigensolve(np.dot(Xc.T, Xc) / (N - 1))
assert evals.shape == (D,) and M.shape == (D, D)
elif D > 2 * N:
print('Using method 2')
evals, M = eigensolve(np.dot(Xc, Xc.T) / (N - 1))
assert evals.shape == (N,) and M.shape == (N, N)
# Eigenvectors are now M = U.T of the SVD. Convert to M = V.
# Use abs(evals) since smallest values might be < 0 due to numerical errors.
M = np.dot(np.dot(np.diag(np.abs(evals) ** -0.5), M), Xc) / np.sqrt(N - 1)
else:
print('Using method 3')
evals, M = svdsolve(Xc)
assert evals.shape == (K,) and M.shape == (K, D)
# Calculate Y such that Xc = Y M.
Y = np.dot(Xc, M.T)
# Trim to latent variable subspace.
Y = Y[:, :n_components]
M = M[:n_components]
# Calculate reconstructed samples.
Xr = np.dot(Y, M) + mu
# Plot some samples and their reconstructions.
for i,c in zip((0, 6, 7), sns.color_palette()):
plt.plot(X[i], '.', c=c, ms=5)
plt.plot(Xr[i], '-', c=c)
plt.show()
Test this driver in each regime by varying the number of features used from spectra_data
with \(N\), \(D\) = 200, 500:
\(N \gg D\): method 1
\(N \ll D\): method 2
\(N \simeq D\): method 3
PCA_fit(spectra_data.iloc[:, 190:230], n_components=2)
PCA_fit(spectra_data, n_components=2)
PCA_fit(spectra_data.iloc[:, 120:320], n_components=2)
Problem 2#
Implement the function below to compare clusters found in a full dataset with those found in a reduced dataset (lower-dimension latent space). Use KMeans for clustering and PCA to obtained the reduced dataset.
def compare_clusters(data, n_clusters, n_components, seed=123):
"""Compare clusters in the full vs reduced feature space.
Parameters
----------
data : pandas DataFrame
Dataset to analyze of shape (N, D).
n_clusters : int
Number of clusters to find using KMeans.
n_components : int
Number of dimensions of the reduced latent variable space
to calculate using PCA.
seed : int
Random number seed used for reproducible KMeans and PCA.
Returns
-------
tuple
Tuple (labels1, labels2) of 1D integer arrays of length N,
with values 0,1,...,(n_clusters-1).
"""
gen = np.random.RandomState(seed=seed)
# YOUR CODE HERE
raise NotImplementedError()
Use the code below to test your function and comment (in the last markdown cell) on how the full vs reduced clusters compare and whether there is an advantage to clustering in reduced dimensions.
labels1, labels2 = compare_clusters(spectra_data, 4, 2)
fig, ax = plt.subplots(4, 2, figsize=(8, 12))
for i in range(4):
sel1 = np.where(labels1 == i)[0]
sel2 = np.where(labels2 == i)[0]
for j in range(4):
ax[i, 0].plot(spectra_data.iloc[sel1[j]], 'r.', ms=5)
ax[i, 1].plot(spectra_data.iloc[sel2[j]], 'b.', ms=5)
Analyzing your results above, comment in the empty markdown cell below on how the full vs reduced clusters compare and whether there is any potential advantage to clustering in reduced dimensions.
Acknowledgments#
Initial version: Mark Neubauer
© Copyright 2024