{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Tutorial 2\n", "\n", "This tutorial contains the following demos:\n", "1. Monte Carlo estimation\n", "1. Non-linear features: good and bad\n", "1. Model selection: cross-validation\n", "1. Standardization and regularization" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Code source: Xianyao Zhang\n", "## with helper functions based on Sebastian Curi and Andreas Krause, and Jaques Grobler (sklearn demos).\n", "# License: BSD 3 clause\n", "\n", "import sys\n", "sys.path.append('./helper_files') # helper files provided on the course website\n", "\n", "import numpy as np\n", "from typing import Callable\n", "\n", "np.set_printoptions(precision=4, suppress=True, linewidth=100)\n", "np.random.seed(42)\n", "\n", "from matplotlib import pyplot as plt\n", "plt.rcParams['figure.figsize'] = [10., 5.]\n", "plt.rcParams['figure.dpi'] = 100\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def plot_function(fn: Callable, a, b, *, n_points=10000, linewidth=1, label='fn', ax=None):\n", " x = np.linspace(a, b, n_points)\n", " y = fn(x)\n", " if ax is None:\n", " ax = plt.gca()\n", " ax.plot(x, y, linewidth=linewidth, label=label)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Monte Carlo integration" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from scipy.integrate import quadrature\n", "from scipy import stats\n", "\n", "fn = lambda x: np.exp(np.sin(x**3))\n", "a, b = -1, 1\n", "\n", "plot_function(fn, a, b)\n", "plot_function(stats.uniform(loc=a, scale=b-a).pdf, a, b, label='uniform')\n", "plot_function(stats.truncnorm(a, b).pdf, a, b, label='truncnorm')\n", "def truncexpon_pdf(a, b):\n", " def fn(x):\n", " return stats.truncexpon(b-a).pdf(x-a)\n", " return fn\n", "plot_function(truncexpon_pdf(a, b), a, b, label='truncexpon')\n", "\n", "plt.legend(); plt.show()\n", "\n", "quad_integral, quad_error = quadrature(fn, a=a, b=b)\n", "print(f'quadrature: int={quad_integral: .4f}, err={quad_error:.4e}')\n", "\n", "def monte_carlo_integrate(fn: Callable, a, b, *, sample_distribution: str='uniform', n_samples=1000):\n", " if sample_distribution == 'uniform':\n", " rv = stats.uniform(loc=a, scale=b-a)\n", " samples = rv.rvs(size=n_samples)\n", " pdf = rv.pdf(samples)\n", " elif sample_distribution == 'truncnorm':\n", " rv = stats.truncnorm(a, b)\n", " samples = rv.rvs(size=n_samples)\n", " pdf = rv.pdf(samples)\n", " elif sample_distribution == 'truncexpon':\n", " rv = stats.truncexpon(b-a)\n", " samples = rv.rvs(size=n_samples) + a\n", " pdf = rv.pdf(samples-a)\n", " else:\n", " raise ValueError(f'Unsupported distribution: {sample_distribution}')\n", " \n", " fn_values = fn(samples)\n", " mc_samples = fn_values / pdf\n", " mc_estimate = np.mean(mc_samples)\n", " mc_std = np.std(mc_samples, ddof=1)\n", " return mc_estimate, mc_std\n", "\n", "mc_integral, mc_error = monte_carlo_integrate(fn=fn, a=a, b=b, n_samples=100000, sample_distribution='uniform')\n", "print(f'uniform_mc: int={mc_integral: .4f}, err={mc_error: .4e}')\n", "mc_integral, mc_error = monte_carlo_integrate(fn=fn, a=a, b=b, n_samples=100000, sample_distribution='truncexpon')\n", "print(f'truncexpon_mc: int={mc_integral: .4f}, err={mc_error: .4e}')\n", "mc_integral, mc_error = monte_carlo_integrate(fn=fn, a=a, b=b, n_samples=100000, sample_distribution='truncnorm')\n", "print(f'truncnormal_mc: int={mc_integral: .4f}, err={mc_error: .4e}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Non-linear features: good and bad" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from collections import OrderedDict\n", "np.random.seed(42)\n", "\n", "true_function = lambda x: np.sin(x) + 2*np.sin(x)*np.cos(x) + .5*np.exp(-(x)**2+1)\n", "# true_function = lambda x: 5*x + 3*x**2 - 0.5*x**4\n", "# true_function = lambda x: 5*x + 3*x**2 - 0.05*x**4 + 15*np.exp(-x**2+1)\n", "\n", "num_points = 50\n", "a, b = -8, 8\n", "x = np.random.uniform(a, b, size=(num_points,))\n", "PLOT_FN_X = np.linspace(a, b, 5000)\n", "\n", "true_y = true_function(x)\n", "noisy_y = true_y + np.random.normal(scale=0.1, size=x.shape)\n", "plt.scatter(x, true_y, s=3, label='true')\n", "plt.scatter(x, noisy_y, s=3, label='noisy')\n", "plot_function(true_function, a, b, label='true_fn')\n", "plt.legend()\n", "plt.show()\n", "\n", "def make_poly(k):\n", " return lambda x: np.power(x, k)\n", "\n", "nonlinears = {\n", " 'const': lambda x: np.ones_like(x),\n", " **{f'poly_{k}': make_poly(k) for k in range(1, 8)}, \n", " 'sinx': lambda x: np.sin(x),\n", " 'cosx': lambda x: np.cos(x),\n", " 'sin2x': lambda x: np.sin(2*x),\n", " 'cos2x': lambda x: np.cos(2*x),\n", " 'sin3x': lambda x: np.sin(3*x),\n", " 'cos3x': lambda x: np.cos(3*x),\n", "# 'expx': lambda x: np.exp(x),\n", "# 'expx2': lambda x: np.exp(x**2),\n", " 'exp-x2': lambda x: np.exp(-x**2),\n", " 'logx': lambda x: np.log(x),\n", "}\n", "\n", "nonlinears = OrderedDict(sorted(nonlinears.items(), key=lambda t: t[0]))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_nonlinear_function(functions: OrderedDict, weights):\n", " # f(x) = w[0]f[0](x) + w[1]f[1](x) + ... \n", " assert len(functions) == len(weights)\n", " def ret_fn(x):\n", " ret = np.zeros_like(x)\n", " for i, (name, fn) in enumerate(functions.items()):\n", " ret += weights[i] * fn(x)\n", " return ret\n", " return ret_fn\n", "\n", "def get_X_functions(x, fns: OrderedDict):\n", " X = np.concatenate([f(x)[:, np.newaxis] for k, f in fns.items()], axis=1)\n", " return X" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fns_poly = OrderedDict([(k, v) for k, v in nonlinears.items() if k.startswith('poly') or k == 'const'])\n", "fns_sin_exp = OrderedDict([(k, v) for k, v in nonlinears.items() if k.startswith('sin') or k.startswith('exp') or k == 'const'])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_X_poly(x):\n", " return get_X_functions(x, fns_poly)\n", "\n", "def get_X_sin_exp(x):\n", " return get_X_functions(x, fns_sin_exp)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.linear_model import LinearRegression\n", "from sklearn.metrics import mean_squared_error\n", "\n", "def plot_weighted_fns(fns, weights, a, b, label='fn'):\n", " fn = get_nonlinear_function(fns, weights)\n", " plot_function(fn, a, b, label=label)\n", " return\n", "\n", "def fit_nonlinear_and_show(reg, x, y, fns: OrderedDict, name=None):\n", " X = get_X_functions(x, fns)\n", "\n", " reg.fit(X, y)\n", " pred_y = reg.predict(X)\n", " \n", " print('coeffs=', reg.coef_)\n", " print(f'score={reg.score(X, y):.4f}')\n", " print(f'mse={mean_squared_error(y, pred_y):.4f}')\n", " plt.scatter(x, pred_y, s=3, label=name or 'pred')\n", " plt.scatter(x, y, s=3, label='true')\n", " return reg" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ax = plt.subplot(2,2,1)\n", "reg1 = fit_nonlinear_and_show(LinearRegression(fit_intercept=False), x, noisy_y, fns_poly, name='poly')\n", "plot_weighted_fns(fns_poly, reg1.coef_, a, b)\n", "plt.legend()\n", "\n", "ax = plt.subplot(2,2,2)\n", "reg2 = fit_nonlinear_and_show(LinearRegression(fit_intercept=False), x, noisy_y, fns_sin_exp, name='sin_exp')\n", "plot_weighted_fns(fns_sin_exp, reg2.coef_, a, b)\n", "plt.legend()\n", "\n", "ax = plt.subplot(2,2,3)\n", "reg3 = fit_nonlinear_and_show(LinearRegression(fit_intercept=False), x, true_y, fns_poly, name='poly')\n", "plot_weighted_fns(fns_poly, reg3.coef_, a, b)\n", "plt.legend()\n", "\n", "ax = plt.subplot(2,2,4)\n", "reg4 = fit_nonlinear_and_show(LinearRegression(fit_intercept=False), x, true_y, fns_sin_exp, name='sin_exp')\n", "plot_weighted_fns(fns_sin_exp, reg4.coef_, a, b)\n", "plt.legend()\n", "\n", "plt.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "X_sin_exp = get_X_sin_exp(x)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fns_all = OrderedDict(list(fns_poly.items()) + list(fns_sin_exp.items()))\n", "reg = fit_nonlinear_and_show(LinearRegression(fit_intercept=False), x, noisy_y, fns_all, name='all')\n", "plot_weighted_fns(fns_all, reg.coef_, a, b)\n", "plt.legend()\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Model selection: cross-validation" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "from sklearn.model_selection import KFold\n", "n_folds = 5\n", "\n", "# Generate a testing set \n", "\n", "def my_cross_val(reg, fns, x, y, *, cv, a, b):\n", " X = get_X_functions(x, fns)\n", " plot_x = x\n", " n_points = X.shape[0]\n", " kf = KFold(n_splits=cv)\n", "\n", "# indices = [list(range(k*n_points//cv, (k+1)*n_points//cv)) for k in range(cv-1)]\n", "# indices.append(list(range((cv-1)*n_points//cv, n_points)))\n", "\n", " cv_score = []\n", " for i, (train_indices, val_indices) in enumerate(kf.split(X)):\n", " print(f'CV fold {i}')\n", "# train_indices = np.setdiff1d(np.arange(n_points), val_ind)\n", " X_train, y_train = X[train_indices], y[train_indices]\n", " X_val, y_val = X[val_indices], y[val_indices]\n", " print(X_train.shape, X_val.shape)\n", " plot_x_train = plot_x[train_indices]\n", " plot_x_val = plot_x[val_indices]\n", " \n", " reg.fit(X_train, y_train)\n", " y_val_pred = reg.predict(X_val)\n", " y_train_pred = reg.predict(X_train)\n", " cv_score.append(-mean_squared_error(y_val_pred, y_val))\n", " \n", " plt.figure()\n", " print(reg.coef_)\n", " plt.scatter(plot_x_train, y_train_pred, s=3, label=f'fold_{i}_train')\n", " plt.scatter(plot_x_train, y_train, s=3, label=f'fold_{i}_trtrue')\n", " plt.scatter(plot_x_val, y_val_pred, s=16, marker='x', label=f'fold_{i}_val')\n", " plt.scatter(plot_x_val, y_val, s=16, marker='*', label=f'fold_{i}_val_true')\n", " plot_weighted_fns(fns, reg.coef_, a, b)\n", " plt.legend()\n", " plt.show()\n", " return cv_score\n", " \n", "my_cross_val(LinearRegression(normalize=True, fit_intercept=False), fns_sin_exp, x, noisy_y, cv=n_folds, a=a, b=b)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "my_cross_val(LinearRegression(normalize=True, fit_intercept=False), fns_poly, x, noisy_y, cv=n_folds, a=a, b=b)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.model_selection import cross_val_score, cross_validate, cross_val_predict\n", "\n", "reg = LinearRegression(normalize=False)\n", "scores_poly = cross_val_score(reg, get_X_poly(x), noisy_y, cv=n_folds, scoring='neg_mean_squared_error')\n", "print('scores_poly =', scores_poly)\n", "print(f'avg = {np.array(scores_poly).mean():.4f}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "reg = LinearRegression(normalize=False)\n", "scores_sin_exp = cross_val_score(reg, get_X_sin_exp(x), noisy_y, cv=n_folds, scoring='neg_mean_squared_error')\n", "print('scores_sin_exp =', scores_sin_exp)\n", "print(f'avg = {np.array(scores_sin_exp).mean():.4f}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Standardization and regularization" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn import datasets, linear_model\n", "from sklearn.datasets import make_regression\n", "from sklearn.preprocessing import scale, PolynomialFeatures\n", "\n", "# Load the diabetes dataset\n", "diabetes = datasets.load_diabetes()\n", "\n", "Y_all = diabetes.target\n", "X_std = scale(diabetes.data) # zero mean unit variance\n", "n_all = Y_all.size\n", "n_test = 20" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "diabetes.data.mean(axis=0)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "diabetes.data.std(axis=0)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "X_std.std(axis=0)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "diabetes.data.shape" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "random_scale = 2**np.random.uniform(-6, 6, size=(1,10))\n", "random_mean = np.random.uniform(-50, 50, size=(1,10))\n", "print(random_scale)\n", "print(random_mean)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "X = X_std*random_scale + random_mean # X_std = (X-mean(X))/std(X)\n", "print('shape=', X.shape)\n", "print('feature_mean=', X.mean(axis=0))\n", "print('feature_std=', X.std(axis=0))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "reg_scaled = LinearRegression(fit_intercept=True)\n", "reg_scaled.fit(X, Y_all)\n", "print(reg_scaled.coef_)\n", "reg_scaled.score(X, Y_all)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "reg_std = LinearRegression()\n", "reg_std.fit(X_std, Y_all)\n", "print(reg_std.coef_)\n", "reg_std.score(X_std, Y_all)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.linear_model import Lasso, Ridge\n", "\n", "def regularize_different_alpha(name: str, X, Y):\n", " print('-' * 50)\n", " print('feature_std=', X.std(axis=0))\n", " print('feature_mean=', X.mean(axis=0))\n", " alpha_list = 10.**np.arange(-7, 8)\n", " for alpha in alpha_list:\n", " if name.lower() == 'ridge':\n", " reg = Ridge(alpha=alpha, normalize=False)\n", " elif name.lower() == 'lasso':\n", " reg = Lasso(alpha=alpha, normalize=False)\n", " else:\n", " raise ValueError('Unrecongnized name')\n", " reg.fit(X, Y)\n", " print(f'alpha={alpha}, score={reg.score(X, Y):.4f}')\n", " print('coef=', reg.coef_[:8]) # 