Homework 07: Markov Chains#
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()
import numpy as np
import pandas as pd
Problem 1#
We showed in class that a Markov chain can have long-range dependencies,
and commented that dependent random variables are usually, but not necessarily, correlated:
where \(\mu\) and \(\sigma\) are the chain’s long-term mean and standard deviation, respectively.
However, it is possible to define (fairly artificial) distributions with dependent random variables that are uncorrelated. For example:
\(x_1\) is uniformly distributed in \([-1,+1]\).
\(x_2 = |x_1|\).
Implement the function below to generate a dataset with these two features. Hints:
Use
uniform()
to generate values of \(x_1\).Use
np.corrcoef()
to calculate the correlation coefficient \(\rho\).
def generate(n, seed):
"""Generate a dataset with two dependent but uncorrelated features.
Parameters
----------
n : int
Number of samples to generate.
seed : int
Seed to use for reproducible random numbers.
Returns
-------
tuple
Tuple (X, rho) of generated data X with shape (n, 2) and the
correlation coefficient rho of the generated data.
"""
# YOUR CODE HERE
raise NotImplementedError()
# A correct solution should pass these tests.
X, _ = generate(n=3, seed=1)
assert np.all(np.round(X, 3) == [[-0.166, 0.166], [ 0.441, 0.441], [-1.000, 1.000]])
X, _ = generate(n=3, seed=2)
assert np.all(np.round(X, 3) == [[-0.128, 0.128], [-0.948, 0.948], [ 0.099, 0.099]])
_, rho = generate(n=10000, seed=3)
assert np.abs(rho) < 0.002
You have now demonstrated that \(\rho \simeq 0\), and a plot of this dataset should convince you that \(x_1\) and \(x_2\) are (highly) dependent:
X, rho = generate(n=50, seed=4)
plt.scatter(X[:, 0], X[:, 1], s=5)
plt.xlabel('$x$'); plt.ylabel('$y$')
plt.text(-0.22, 0.9, '$\\rho = {:.3f}$'.format(rho), fontsize='x-large');
Problem 2#
In this problem, you will implement a Metropolis update rule to sample a Markov chain for the un-normalized probability density:
with hyperparameter \(s > 0\).
def P(x,y,s):
return 0.5 * (np.exp(-0.5 * ((x / s) ** 2 + (y * s) ** 2)) + np.exp(-0.5 * ((x * s) ** 2 + (y / s) ** 2)))
def plot_P(s=3, lim=5):
fig = plt.figure(figsize=(6, 6))
xy = np.linspace(-lim, +lim, 250)
Pxy = P(xy, xy[:, np.newaxis], s)
plt.contour(xy, xy, Pxy, [0.1, 0.2, 0.3], colors='r')
plot_P()
Implement the function below to perform a Metropolis update starting from \((x,y)\) and using a Gaussian proposal distribution \(Q(x,y)\) centered at \((0,0)\) with standard deviation \(\sigma\) along both coordinates. Use the “random walk” mode for your proposed updates. (Recall that Metropolis updates are a special case of Metropolis-Hastings updates where the ratio of \(Q\) factors cancels in the Hastings ratio.)
def metropolis_update(x, y, s, gen, sigma=1):
"""Perform a single Metropolis update.
Parameters
----------
x : float
Value of x from the previous step.
y : float
Value of y from the previous step.
s : float
Value of the hyperparameter s.
gen : np.random.RandomState
Random state to use for reproducible random samples.
sigma : float
Standard deviation of the Gaussian proposal distribution Q(x,y).
Returns
-------
tuple
Tuple (x,y) of the position after the update.
"""
# YOUR CODE HERE
raise NotImplementedError()
# A correct solution should pass these tests.
s, n = 3, 1000
gen = np.random.RandomState(seed=123)
# Generate steps from (0, 0) with sigma=1
xy = np.array([metropolis_update(0, 0, s, gen, sigma=1) for i in range(n)])
nrepeat = np.count_nonzero(np.all(xy == [0, 0], axis=1))
assert np.allclose(nrepeat / n, 0.69967, atol=0.1, rtol=0.1)
assert np.allclose(np.mean(xy, axis=0), [0, 0], atol=0.1, rtol=0.1)
# Generate steps from (5, 0) with sigma=1
xy = np.array([metropolis_update(5, 0, s, gen, sigma=1) for i in range(n)])
nrepeat = np.count_nonzero(np.all(xy == [5, 0], axis=1))
assert np.allclose(nrepeat / n, 0.70136, atol=0.1, rtol=0.1)
assert np.allclose(np.mean(xy, axis=0), [4.893, 0], atol=0.1, rtol=0.1)
# Generate steps from (1, -1) with sigma=1
xy = np.array([metropolis_update(1, -1, s, gen, sigma=1) for i in range(n)])
nrepeat = np.count_nonzero(np.all(xy == [1, -1], axis=1))
assert np.allclose(nrepeat / n, 0.26665, atol=0.1, rtol=0.1)
assert np.allclose(np.mean(xy, axis=0), [0.822, -0.822], atol=0.1, rtol=0.1)
# Generate steps from (1, -1) with sigma=2
xy = np.array([metropolis_update(1, -1, s, gen, sigma=2) for i in range(n)])
nrepeat = np.count_nonzero(np.all(xy == [1, -1], axis=1))
assert np.allclose(nrepeat / n, 0.43847, atol=0.1, rtol=0.1)
assert np.allclose(np.mean(xy, axis=0), [0.781, -0.781], atol=0.1, rtol=0.1)
Test your solution with the following visualization:
def plot_chain(update_rule, x0=0, y0=0, s=3, n_updates=200, lim=5, seed=123, **kwargs):
gen = np.random.RandomState(seed=seed)
path = [(x0, y0)]
for i in range(n_updates):
path.append(update_rule(*path[-1], s, gen, **kwargs))
plot_P(s, lim)
x, y = np.array(path).T
plt.scatter(x, y, s=10, c='k')
plt.plot(x, y, 'k-', lw=0.5, alpha=0.3)
plt.xlim(-lim, +lim)
plt.ylim(-lim, +lim)
plt.xlabel('$x$')
plt.ylabel('$y$')
plot_chain(metropolis_update, sigma=2.0)
plot_chain(metropolis_update, sigma=1.0)
plot_chain(metropolis_update, sigma=0.5)
Problem 3#
In this problem you will implement a Gibbs update rule for the same probability density.
Implement the function below to sample from the conditional distribution \(P(x\mid y)\).
Hint: each sample can be drawn from a single Gaussian with \(\sigma = s\) or \(\sigma = 1/s\) as long as you weight the contributions from each Gaussian correctly given \(y\).
def sample_conditional(y, s, gen):
"""Sample from the conditional distribution P(x | y).
Parameters
----------
y : float
Fixed value of y to use.
s : float
Value of the hyperparameter s.
gen : np.random.RandomState
Random state to use for reproducible random samples.
Returns
-------
float
Random value of x sampled from P(x | y).
"""
# YOUR CODE HERE
raise NotImplementedError()
# A correct solution should pass these tests.
s, n = 3, 10000
gen = np.random.RandomState(seed=123)
# With y=+/-4, the distribution of x should be narrow.
x = [sample_conditional(+4, s, gen) for i in range(n)]
assert np.allclose(np.percentile(x, (5, 50, 95)), [-0.549, 0, +0.549], atol=0.1, rtol=0.1)
x = [sample_conditional(-4, s, gen) for i in range(n)]
assert np.allclose(np.percentile(x, (5, 50, 95)), [-0.549, 0, +0.549], atol=0.1, rtol=0.1)
# With y=0, the distribution of x should have a narrow core and a wide tail.
x = [sample_conditional(0, s, gen) for i in range(n)]
assert np.allclose(np.percentile(x, (5, 25, 50, 75, 95)), [-3.84, -0.50, 0, +0.50, +3.84], atol=0.1, rtol=0.1)
Implement the function below to perform a Gibbs update consisting of:
Sample \(x_{n+1}\) from \(P_X(x\mid y_n)\)
Sample \(y_{n+1}\) from \(P_Y(y\mid x_{n+1})\)
Note that you can use sample_conditional()
for both steps by noticing that \(P_Y(y\mid x)\) equals \(P_X(x\mid y)\) when \(x\) and \(y\) are swapped.
def gibbs_update(x, y, s, gen):
"""Perform a single Gibbs update.
Parameters
----------
x : float
Value of x from the previous step.
y : float
Value of y from the previous step.
s : float
Value of the hyperparameter s.
gen : np.random.RandomState
Random state to use for reproducible random samples.
Returns
-------
tuple
Tuple (x,y) of the position after the update.
"""
# YOUR CODE HERE
raise NotImplementedError()
# A correct solution should pass these tests.
s, n = 3, 10000
gen = np.random.RandomState(seed=123)
# Generate steps from (0,0).
xy = np.array([gibbs_update(0, 0, s, gen) for i in range(n)])
assert np.allclose(
np.percentile(xy[:, 0], (5, 25, 50, 75, 95)),
[-3.849, -0.502, 0, +0.502, +3.849], atol=0.1, rtol=0.1)
assert np.allclose(
np.percentile(xy[:, 1], (5, 25, 50, 75, 95)),
[-2.36, -0.297, 0, +0.297, +2.36], atol=0.1, rtol=0.1)
# Steps from (5,0) have the same distribution.
xy = np.array([gibbs_update(5, 0, s, gen) for i in range(n)])
assert np.allclose(
np.percentile(xy[:, 0], (5, 25, 50, 75, 95)),
[-3.849, -0.502, 0, +0.502, +3.849], atol=0.1, rtol=0.1)
assert np.allclose(
np.percentile(xy[:, 1], (5, 25, 50, 75, 95)),
[-2.36, -0.297, 0, +0.297, +2.36], atol=0.1, rtol=0.1)
# Steps from (0,-5) have a different distribution.
xy = np.array([gibbs_update(0, -5, s, gen) for i in range(n)])
assert np.allclose(
np.percentile(xy[:, 0], (5, 25, 50, 75, 95)),
[-0.548, -0.225, 0, +0.225, +0.548], atol=0.1, rtol=0.1)
assert np.allclose(
np.percentile(xy[:, 1], (5, 25, 50, 75, 95)),
[-3.42, -0.391, 0, +0.391, +3.42], atol=0.1, rtol=0.1)
Test your solution with the following visualization:
plot_chain(gibbs_update)
Problem 4#
If we define the potential energy for a “particle” as:
it has partial derivatives:
with
A Hamiltonian MC simulates the trajectory of a particle using the equations of motion:
where we have set all masses equal to 1 and the temperature \(k_B T = 1\).
Implement the function below to perform a single \(\Delta t\) step according to the equations above:
def HMC_step(x, y, px, py, s, dt):
"""Perform a single HMC dt step.
Parameters
----------
x : float
Current x position.
y : float
Current y position.
px : float
Current x momentum.
py : float
Current y momentum.
s : float
Value of the hyperparameter s.
dt : float
Step size to take.
Returns
-------
tuple
Tuple (x, y, px, py) with particle position and momentum after this step.
"""
# YOUR CODE HERE
raise NotImplementedError()
# A correct solution should pass these tests.
assert np.all(HMC_step(0., 0., 0., 0., 3, 1.) == np.array([0.,0.,0.,0.]))
assert np.all(HMC_step(0., 0., 1., 1., 3, 0.) == np.array([0.,0.,1.,1.]))
assert np.all(HMC_step(0., 0., 1., 1., 3, 1.) == np.array([1.,1.,1.,1.]))
assert np.all(HMC_step(0., 0., 1., 1., 3, 2.) == np.array([2.,2.,1.,1.]))
assert np.all(np.round(HMC_step(1., 1., -1., 1., 3, 1.), 3) == np.array([0.,2.,-5.556,-3.556]))
assert np.all(np.round(HMC_step(0., 1., -1., 1., 3, 1.), 3) == np.array([-1.,2.,-1.,0.786]))
assert np.all(np.round(HMC_step(1., 0., -1., 1., 3, 1.), 3) == np.array([0.,1.,-1.214,1.]))
In order to perform an HMC update, we first need to generate random values of the (nuisance) parameters \(p_x\) and \(p_y\), then we follow the resulting particle from its initial conditions through a fixed number of steps. The result of the update is wherever the particle ends up. Note that the only use of random numbers is to generate the particle’s initial momentum.
def HMC_update(x0, y0, s, gen, p_sigma, n_steps, dt):
"""Perform a single HMC update by following a single particle for a fixed time.
Parameters
----------
x0 : float
Initial x position.
y0 : float
Initial y position.
s : float
Value of the hyperparameter s.
gen : np.random.RandomState
Random state to use for reproducible random samples.
p_sigma : float
Gaussian sigma for generating random initial (px,py) values.
n_steps : int
Number of particle steps to simulate.
dt : float
Size of each step.
Returns
-------
array
Array of shape (n_steps + 1, 4) with the particle position and momentum after each step.
"""
px0, py0 = gen.normal(scale=p_sigma, size=2)
path = [(x0, y0, px0, py0)]
for i in range(n_steps):
path.append(HMC_step(*path[-1], s, dt))
return np.array(path)
Finally, use the following visualization to see the particle trajectories from different updates:
def plot_HMC(x0=0, y0=0, s=3, p_sigma=1., n_steps=500, dt=0.01, n_updates=50, lim=5, seed=3, **kwargs):
gen = np.random.RandomState(seed=seed)
plot_P(s, lim)
for i in range(n_updates):
path = HMC_update(x0, y0, s, gen, p_sigma, n_steps, dt)
x, y, px, py = np.array(path).T
plt.scatter(x[-1], y[-1], s=25, c='k')
plt.plot(x, y, 'b-', lw=1, alpha=0.3)
plt.xlim(-lim, +lim)
plt.ylim(-lim, +lim)
plt.xlabel('$x$')
plt.ylabel('$y$')
plot_HMC()
Acknowledgments#
Initial version: Mark Neubauer
© Copyright 2024