{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "# Code source: Sebastian Curi and Andreas Krause, based on Jaques Grobler (sklearn demos).\n", "# License: BSD 3 clause\n", "\n", "# We start importing some modules and running some magic commands\n", "%matplotlib inline\n", "%reload_ext autoreload\n", "%load_ext autoreload\n", "%autoreload 2\n", "\n", "# General math and plotting modules.\n", "import numpy as np\n", "import matplotlib as mpl\n", "import matplotlib.pyplot as plt\n", "from scipy.special import erfinv\n", "from scipy import linalg\n", "\n", "# Project files.\n", "from utilities.load_data import polynomial_data, linear_separable_data\n", "from utilities import plot_helpers\n", "from utilities.widgets import noise_widget, n_components_widget, min_prob_widget\n", "\n", "# Widget and formatting modules\n", "import IPython\n", "import ipywidgets\n", "from ipywidgets import interact, interactive, interact_manual, fixed\n", "from matplotlib import rcParams\n", "import matplotlib as mpl \n", "from scipy.stats import multivariate_normal, norm\n", "# If in your browser the figures are not nicely vizualized, change the following line. \n", "rcParams['figure.figsize'] = (10, 5)\n", "rcParams['font.size'] = 16\n", "\n", "# Machine Learning library. \n", "from sklearn.mixture import GaussianMixture\n", "from sklearn.cluster import KMeans\n", "from sklearn import datasets\n", "\n", "import warnings\n", "warnings.filterwarnings(\"ignore\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 1-D Gaussian Mixture Models" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from ipywidgets import GridspecLayout, jslink\n", "from collections import OrderedDict\n", "grid = GridspecLayout(3, 3)\n", "rcParams['figure.figsize'] = (20, 5)\n", "rcParams['font.size'] = 16\n", "X = np.linspace(-5, 5, 1000)\n", "\n", "n_components = 3\n", "\n", "\n", "# def GMMs(mean1, mean2, mean3, scale1, scale2, scale3, weight1, weight2, weight3):\n", "# means, scales, weights = [mean1, mean2, mean3], [scale1, scale2, scale3], [weight1, weight2, weight3]\n", "def GMM(mean1, mean2, mean3, scale1, scale2, scale3, weight1, weight2, weight3):\n", " means = [mean1, mean2, mean3]\n", " scales = [scale1, scale2, scale3]\n", " weights = [weight1, weight2, weight3]\n", " gaussians = [norm(mean, scale) for mean, scale in zip(means, scales)]\n", " scale = sum(weights)\n", " fig, axes = plt.subplots(1, 2)\n", " y = np.zeros_like(X)\n", " for i, (weight, gaussian) in enumerate(zip(weights, gaussians)):\n", " axes[0].plot(X, gaussian.pdf(X), label=f\"{i}-th gaussian\")\n", " y += weight * gaussian.pdf(X) / scale\n", " axes[0].legend()\n", " axes[0].set_title('Independent Gaussians')\n", "\n", " axes[1].plot(X, y)\n", " axes[1].set_title('Gaussian Mixture Model')\n", " plt.show()\n", "# return fig \n", "\n", "means = OrderedDict()\n", "scales = OrderedDict()\n", "weights = OrderedDict()\n", "\n", "means['mean1'] = ipywidgets.FloatSlider(value=-3, min=-4, max=4, step=0.01, continuous_update=False,\n", " description='Means')\n", "means['mean2'] = ipywidgets.FloatSlider(value=0, min=-4, max=4, step=0.01, continuous_update=False)\n", "means['mean3'] = ipywidgets.FloatSlider(value=2, min=-4, max=4, step=0.01, continuous_update=False)\n", "scales['scale1'] = ipywidgets.FloatSlider(value=0.5, min=0.1, max=2, step=0.01, continuous_update=False,\n", " description='Scales')\n", "scales['scale2'] = ipywidgets.FloatSlider(value=1, min=0.1, max=2, step=0.01, continuous_update=False)\n", "scales['scale3'] = ipywidgets.FloatSlider(value=0.1, min=0.1, max=2, step=0.01, continuous_update=False)\n", "weights['weight1'] = ipywidgets.FloatSlider(value=0.33, min=0, max=1, step=0.01, continuous_update=False,\n", " description='Weights')\n", "weights['weight2'] = ipywidgets.FloatSlider(value=0.33, min=0, max=1, step=0.01, continuous_update=False)\n", "weights['weight3'] = ipywidgets.FloatSlider(value=0.33, min=0, max=1, step=0.01, continuous_update=False)\n", " \n", "\n", "meanw = ipywidgets.HBox(tuple(means.values()))\n", "scalew = ipywidgets.HBox(tuple(scales.values()))\n", "weightw = ipywidgets.HBox(tuple(weights.values()))\n", "\n", "params = dict(means)\n", "params.update(dict(scales))\n", "params.update(dict(weights))\n", "out = ipywidgets.interactive_output(GMM, params)\n", "\n", "display(meanw, scalew, weightw, out)\n", "# dict(means).update" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# EM Algorithm" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def weighted_average(data, weights):\n", " \"\"\"Given data and weights calculate weighted average.\n", " \n", " Parameters\n", " ----------\n", " data: [num_points x dimension] or [num_points x dimension]\n", " weights: [num_points x num_components], non-negative\n", " \n", " Returns\n", " -------\n", " weighted_average: [num_components x dimension] or [num_components x dimension x dimension]\n", " \"\"\"\n", " \n", " assert np.all(weights >= 0)\n", " w = weights / weights.sum(axis=0) \n", " return w.T @ data \n", " \n", "\n", "class EMGMM(object):\n", " def __init__(self, X, num_components, kind='soft', initialization='random', nu=1e-6):\n", " self.X = X \n", " self.dim = X.shape[1]\n", " self.num_points = X.shape[0]\n", " self.num_components = num_components\n", " self.kind = kind \n", " self.nu = nu \n", " self.initialization = initialization\n", " \n", " self.restart()\n", " \n", " def init_means(self):\n", " if self.initialization == 'random':\n", " self.means = np.random.uniform(\n", " low=self.X.min(axis=0), high=(self.X.max(axis=0)), \n", " size=(self.num_components, self.dim))\n", " elif self.initialization == 'random from data':\n", " idx = np.random.choice(self.num_points, self.num_components, replace=False)\n", " self.means = self.X[idx]\n", " elif self.initialization == 'kmeans++':\n", " self.means = np.empty((self.num_components, self.dim))\n", " i = np.random.choice(self.num_points, 1)\n", " self.means[0] = self.X[i]\n", " indexes = [i] \n", " for j in range(1, self.num_components):\n", " probs = np.zeros(self.num_points)\n", " \n", " for i, x in enumerate(self.X):\n", " d = np.linalg.norm(x - self.means[:j], 2, axis=1) ** 2 \n", " probs[i] = np.min(d)\n", " \n", " probs = probs / np.sum(probs)\n", " i = np.random.choice(self.num_points, 1, p = probs)\n", " assert np.all(probs[np.array(indexes)] == 0)\n", " # All previous indexes should have zero probability of being selected. \n", " \n", " self.means[j] = self.X[i] \n", " indexes.append(i)\n", " \n", " def restart(self): \n", " self.probabilities = np.ones((self.num_points, self.num_components)) / self.num_components\n", " self.init_means()\n", " self.covariances = np.array([np.diag(self.X.std(axis=0)) for _ in range(self.num_components)])\n", " self.weights = np.random.rand(self.num_components) \n", " self.weights /= np.sum(self.weights)\n", " \n", " def expectation(self):\n", " gaussians = [multivariate_normal(mean, cov) for mean, cov in zip(self.means, self.covariances)] \n", " \n", " for i, gaussian in enumerate(gaussians):\n", " self.probabilities[:, i] = gaussian.pdf(self.X)\n", " \n", " if self.kind == 'soft':\n", " self.probabilities /= self.probabilities.sum(axis=1)[:, np.newaxis]\n", " else:\n", " classes = np.argmax(self.probabilities, axis=1)\n", " aux = np.zeros_like(self.probabilities)\n", " aux[np.arange(self.num_points), classes] = 1\n", " self.probabilities = aux\n", " \n", " def maximization(self):\n", " \"Fit clusters to weighted data points.\"\n", " self.weights = np.mean(self.probabilities, axis=0)\n", " \n", " avg_weights = (self.probabilities / self.probabilities.sum(axis=0)).T\n", " avg_weights = np.nan_to_num(avg_weights, 0)\n", " self.means = avg_weights @ self.X\n", " covariances = []\n", " for mean, weight in zip(self.means, avg_weights):\n", " delta = self.X - mean\n", " cov = delta.T @ np.diag(weight) @ delta + self.nu * np.eye(self.dim)\n", " covariances.append(cov)\n", " self.covariances = np.array(covariances)\n", " \n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_dataset(dataset, n_samples=200, noise=0.3):\n", " if dataset == 'linear':\n", " X, Y = linear_separable_data(n_samples, noise=noise, dim=2) \n", " Y = (Y + 1) // 2\n", " elif dataset == '2-blobs':\n", " X, Y = datasets.make_classification(n_classes=2, n_features=2, n_informative=2, n_redundant=0,\n", " n_clusters_per_class=1, n_samples=n_samples, random_state=8)\n", " elif dataset == '3-blobs':\n", " X, Y = datasets.make_classification(n_classes=3, n_features=2, n_informative=2, n_redundant=0,\n", " n_clusters_per_class=1, n_samples=n_samples, random_state=8)\n", " elif dataset == '4-blobs':\n", " X, Y = datasets.make_classification(n_classes=4, n_features=2, n_informative=2, n_redundant=0,\n", " n_clusters_per_class=1, n_samples=n_samples, random_state=8) \n", " elif dataset == 'high-dim':\n", " X, Y = datasets.make_classification(n_classes=3, n_features=10, n_informative=6, n_samples=n_samples,\n", " random_state=8)\n", " \n", " elif dataset == 'circles':\n", " X, Y = datasets.make_circles(noise=noise, n_samples=n_samples, factor=.5)\n", " elif dataset == 'moons':\n", " X, Y = datasets.make_moons(noise=noise, n_samples=n_samples)\n", " elif dataset == 'imbalanced':\n", " X, Y = linear_separable_data(n_samples, noise=noise, dim=2, num_negative=int(n_samples * 0.2))\n", " Y = (Y + 1) // 2\n", " elif dataset == 'varied variance':\n", " X, Y = datasets.make_blobs(n_samples=n_samples, cluster_std=[1.0, 2.5, 0.5], random_state=170)\n", "\n", " elif dataset == 'correlated':\n", " X, Y = datasets.make_classification(n_classes=2, n_features=2, n_informative=1, n_redundant=1,\n", " n_clusters_per_class=1, n_samples=n_samples, random_state=8) \n", " elif dataset == 'iris':\n", " X, Y = datasets.load_iris(return_X_y=True)\n", " elif dataset == 'mnist':\n", " X, Y = datasets.load_digits(return_X_y=True)\n", " elif dataset == 'wine':\n", " X, Y = datasets.load_wine(return_X_y=True)\n", " return X, Y" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "def em(dataset, n_centers, kind, initialization, nu):\n", " X, _ = get_dataset(dataset, noise=0.1)\n", " X = X[:, :2]\n", " expectation_button = ipywidgets.Button(description=\"Expectation Step\")\n", " maximization_button = ipywidgets.Button(description=\"Maximization Step\")\n", " restart_button = ipywidgets.Button(description=\"Restart\")\n", "\n", " cmap = plt.cm.viridis\n", " norm = mpl.colors.Normalize(vmin=0, vmax=n_centers - 1)\n", "\n", "\n", " algorithm = EMGMM(X, n_centers, kind=kind, initialization=initialization, nu=nu)\n", "\n", " def plot(display_expectation=True):\n", " fig, ax = plt.subplots(1, 1)\n", "\n", " colors = (algorithm.probabilities * np.arange(n_centers)).sum(axis=1)\n", " ax.scatter(X[:, 0], X[:, 1], c=cmap(norm(colors)), \n", " cmap=cmap)\n", "\n", " for i, (weight, mean, cov) in enumerate(zip(algorithm.weights, algorithm.means, algorithm.covariances)):\n", " plot_helpers.plot_ellipse(mean, cov, color=cmap(norm(i)), ax=ax)\n", "\n", " IPython.display.clear_output(wait=True)\n", " IPython.display.display(fig)\n", " plt.close()\n", "\n", " if display_expectation:\n", " display(expectation_button) \n", " else:\n", " display(maximization_button)\n", "\n", " display(restart_button)\n", "\n", " def restart(b):\n", " algorithm.restart()\n", " plot(display_expectation=True)\n", "\n", " def expectation(b):\n", " algorithm.expectation()\n", " plot(display_expectation=False)\n", "\n", " def maximization(b):\n", " algorithm.maximization()\n", " plot(display_expectation=True)\n", "\n", " expectation_button.on_click(expectation)\n", " maximization_button.on_click(maximization)\n", " restart_button.on_click(restart)\n", " plot()\n", "\n", "interact(em, dataset=['3-blobs', 'varied variance', 'linear', 'imbalanced', '2-blobs', '4-blobs', 'iris', 'circles', 'moons'], \n", " n_centers=ipywidgets.IntSlider(value=3, min=1, max=10),\n", " kind=['hard', 'soft' ],\n", " initialization=['random', 'random from data', 'kmeans++'],\n", " nu=ipywidgets.BoundedFloatText(value=1e-6, min=0, max=1000, step=1e-6, \n", " description='Nu:', continuous_update=False)\n", " );" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Covariance Models & Density Estimation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rcParams['figure.figsize'] = (16, 8)\n", "rcParams['font.size'] = 16\n", "\n", "def density_estimation(dataset, n_centers, covariance_type, noise):\n", " resample_button = ipywidgets.Button(description=\"Resample\")\n", " X, _ = get_dataset(dataset, noise=noise) \n", " X = X[:, :2]\n", " cmap = plt.cm.viridis\n", " norm = mpl.colors.Normalize(vmin=0, vmax=n_centers - 1)\n", " \n", " def resample(b):\n", " \n", " model = GaussianMixture(n_components=n_centers, covariance_type=covariance_type).fit(X)\n", "\n", "\n", " fig, axes = plt.subplots(1, 2)\n", " colors = (model.predict_proba(X) * np.arange(n_centers)).sum(axis=1)\n", " for ax in axes:\n", " ax.scatter(X[:, 0], X[:, 1], c=cmap(norm(colors)), cmap=cmap)\n", "\n", " weights, means = model.weights_, model.means_\n", "\n", " if covariance_type == 'full':\n", " covariances = model.covariances_\n", " elif covariance_type == 'tied':\n", " covariances = [model.covariances_ for _ in range(n_centers)]\n", " elif covariance_type == 'diag':\n", " covariances = [np.diag(cov) for cov in model.covariances_]\n", " elif covariance_type == 'spherical':\n", " covariances = [np.eye(X.shape[-1]) * cov for cov in model.covariances_]\n", "\n", " for i, (weight, mean, cov) in enumerate(zip(weights, means, covariances)):\n", " plot_helpers.plot_ellipse(mean, cov, color=cmap(norm(i)), ax=axes[0])\n", "\n", " xx, yy = plot_helpers.make_meshgrid(X[:, 0], X[:, 1], h=.02)\n", " xy = np.c_[xx.ravel(), yy.ravel()]\n", " P = -model.score_samples(xy)\n", " axes[1].contourf(xx, yy, P.reshape(xx.shape), cmap=cmap, alpha=0.5, \n", " norm=mpl.colors.LogNorm(vmin=1.0, vmax=1000.0), levels=np.logspace(0, 3, 10))\n", "\n", " axes[0].set_aspect('equal')\n", " axes[0].set_title('Gaussian Fit')\n", " axes[1].set_aspect('equal')\n", " axes[1].set_title('Density Estimation')\n", " \n", " IPython.display.clear_output(wait=True)\n", " IPython.display.display(fig)\n", " plt.close()\n", "\n", " display(resample_button)\n", " \n", " resample_button.on_click(resample)\n", " resample(None)\n", "\n", "interact(density_estimation, \n", " dataset=['3-blobs', 'varied variance', 'linear', 'imbalanced', '2-blobs', '4-blobs', 'iris', 'circles', 'moons'], \n", " n_centers=ipywidgets.IntSlider(value=3, min=1, max=10),\n", " covariance_type=['full', 'tied', 'diag', 'spherical'],\n", " noise=ipywidgets.FloatSlider(value=0.2, min=0, max=1, step=0.01, description='Noise:', continuous_update=False)\n", " );" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# GMMs Clustering" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def clustering(algorithm, dataset, n_centers, noise):\n", " resample_button = ipywidgets.Button(description=\"Resample\")\n", "\n", " X, _ = get_dataset(dataset, noise=noise) \n", " X = X[:, :2]\n", " \n", " def resample(b):\n", " if algorithm == 'full-GMM':\n", " model = GaussianMixture(n_components=n_centers, covariance_type='full').fit(X)\n", " elif algorithm == 'tied-GMM':\n", " model = GaussianMixture(n_components=n_centers, covariance_type='tied').fit(X)\n", " elif algorithm == 'diag-GMM':\n", " model = GaussianMixture(n_components=n_centers, covariance_type='diag').fit(X)\n", " elif algorithm == 'spherical-GMM':\n", " model = GaussianMixture(n_components=n_centers, covariance_type='spherical').fit(X)\n", " elif algorithm == 'KMeans++':\n", " model = KMeans(n_clusters=n_centers, init='k-means++').fit(X)\n", " elif algorithm == 'KMeans':\n", " model = KMeans(n_clusters=n_centers, init='random').fit(X)\n", "\n", " plt.scatter(X[:, 0], X[:, 1], c=model.predict(X))\n", "\n", " IPython.display.clear_output(wait=True)\n", " IPython.display.display(plt.gcf())\n", " plt.close()\n", "\n", " display(resample_button)\n", " \n", " resample_button.on_click(resample)\n", " resample(None)\n", " \n", "\n", "interact(clustering, \n", " algorithm=['full-GMM', 'KMeans++', 'KMeans', 'tied-GMM', 'diag-GMM', 'spherical-GMM', ],\n", " dataset=['3-blobs', 'varied variance', 'linear', 'imbalanced', '2-blobs', '4-blobs', 'iris', 'circles', 'moons'], \n", " n_centers=ipywidgets.IntSlider(value=3, min=1, max=10),\n", " noise=ipywidgets.FloatSlider(value=0.2, min=0, max=1, step=0.01, description='Noise:', continuous_update=False)\n", " );" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "celltoolbar": "Raw Cell Format", "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }