Homework 06: Bayesian Statistics and MCMC#

%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()
import numpy as np
import pandas as pd
%pip install emcee

Problem 1#

Suppose you have an algorithm to identify quasars from astronomical images, which simply returns True or False. Using control samples, you have determined that your algorithm has the following performance for real quasars as well as the main contaminants for a quasar sample: galaxies and stars.

data?

M=quasar

M=galaxy

M=star

D=True

0.8

0.1

0.2

D=False

0.2

0.9

0.8

Implement the following function to calculate the likelihood \(P(D\mid M)\) given this information:

def likelihood(D, M):
    """Calculate likelihood of data D given model M.
    
    Parameters
    ----------
    D : bool
        A boolean (True/False) value indicating whether the algorithm identified an
        object as being a quasar or not.
    M : str
        A string ('quasar', 'galaxy', 'star') specifying the assumed model.
        
    Returns
    -------
    float
        The probability of the data given the model.
    """
    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
assert likelihood(True, 'quasar') == 0.8
assert likelihood(False, 'galaxy') == 0.9
assert likelihood(True, 'star') == 0.2
for M in 'quasar', 'galaxy', 'star':
    assert likelihood(True, M) + likelihood(False, M) == 1

The prior probability of each model will vary between images, depending mostly on the local density of stars which can vary a lot (and is especially high when you look through the disk of the Milky Way).

Implement the function below to calculate the prior probabilities of each model for an image based on the expected number of objects of each type:

def prior(num_quasars_expected, num_galaxies_expected, num_stars_expected):
    """Calculate the prior probability of each model.
    
    Parameters
    ----------
    num_quasars_expected : int
        Number of expected quasars.
    num_galaxies_expected : int
        Number of expected galaxies.
    num_stars_expected : int
        Number of expected stars.
    
    Returns
    -------
    dict
        Dictionary of prior probabilities for each model with keys 'quasar',
        'galaxy' and 'star'.
    """
    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
assert prior(100, 100, 200) == {'quasar': 0.25, 'galaxy': 0.25, 'star': 0.5}
assert prior(100, 100, 600) == {'quasar': 0.125, 'galaxy': 0.125, 'star': 0.75}

You have now the code necessary to quantify how well your quasar identification algorithm performs in regions with different densities of stars, using the posterior probability \(P(\text{quasar}\mid \text{True})\). For example, if the stellar density increases 3 times (from 200 to 600 per image) with fixed quasar and galaxy densities (100 each), the posterior probability drops from 0.615 to 0.381:

import IPython.display

def Learn(prior, likelihood, *data):
    """Learn from data using Bayesian inference.

    Assumes that the model and data spaces are discrete.

    Parameters
    ----------
    prior : dict
        Dictionary of prior probabilities for all possible models.
    likelihood : callable
        Called with args (D,M) and must return a normalized likelihood.
    data : variable-length list
        Zero or more items of data to use in updating the prior.
    """
    # Initialize the Bayes' rule numerator for each model.
    prob = prior.copy()
    history = [('PRIOR', prior)]
    # Loop over data.
    for D in data:
        # Update the Bayes' rule numerator for each model.
        prob = {M: prob[M] * likelihood(D, M) for M in prob}
        # Calculate the Bayes' rule denominator.
        norm = sum(prob.values())
        # Calculate the posterior probabilities for each model.
        prob = {M: prob[M] / norm for M in prob}
        history.append(('D={}'.format(D), prob))
    # Print our learning history.
    index, rows = zip(*history)
    IPython.display.display(pd.DataFrame(list(rows), index=index).round(3))
Learn(prior(100, 100, 200), likelihood, True)
Learn(prior(100, 100, 600), likelihood, True)

Problem 2#

Suppose you measure a random process that follows an exponential decay law for the number \(n(t)\) of un-decayed states as a function of time \(t\):

\[ \Large \frac{dn}{dt} = -\lambda n \; , \]

and want to infer the posterior probability of the decay rate \(\lambda\) given your data.

First, implement the function below to evaluate the likelihood of observing \(N\) decay times \(D = \{t_1, t_2, \ldots\}\) as:

\[ \Large P(D\mid \lambda) = \prod_{i=1}^{N}\, P(t_i\mid \lambda) \]

where the un-normalized probability density for exponential decay is:

\[ \Large P(t\mid \lambda) \propto \exp(-\lambda t) \; . \]
def decay_likelihood(decay_times, lam):
    """Calculate the normalized likelihood of measured times assuming a decay rate.
    """
    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
assert decay_likelihood([], 1) == 1
assert np.round(decay_likelihood([1], 0.1), 4) == 0.0905
assert np.round(decay_likelihood([1], 1.0), 4) == 0.3679
assert np.round(decay_likelihood([1], 1.5), 4) == 0.3347
assert np.round(decay_likelihood([1,2,1], 0.1), 4) == 0.0007
assert np.round(decay_likelihood([1,2,1], 1.0), 4) == 0.0183
assert np.round(decay_likelihood([1,2,1], 1.5), 4) == 0.0084

For our prior, we use the Gamma distribution, which has two hyperparameters \(\alpha\) and \(\beta\):

\[ \Large P(\lambda\mid \alpha,\beta) = \frac{\beta^\alpha \lambda^{\alpha-1} e^{-\beta\lambda}}{\Gamma(\alpha)} \; . \]

Implement the function below to evaluate the Gamma distribtion PDF using a numpy expression for the numerator and scipy.special.gamma for the denominator:

import scipy.special

def gamma_distribution(lam, alpha, beta):
    """Evaluate the gamma distribution.
    """
    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
import scipy.stats
assert gamma_distribution(1, 0, 1) == 0
assert gamma_distribution(1, 1, 0) == 0
for lam in (0.1, 1, 2):
    for alpha, beta in (1, 1), (2, 1), (2, 2):
        assert np.allclose(
            gamma_distribution(lam, alpha, beta),
            scipy.stats.gamma.pdf(lam, a=alpha, scale=1/beta))

The advantage of this choice of prior is that the evidence integral can be performed analytically:

\[ \Large P(D\mid \alpha,\beta) = \int d\lambda\, P(D\mid\lambda)\, P(P(\lambda\mid \alpha,\beta) = \frac{\beta^\alpha (\beta + T)^{-(\alpha+N)} \Gamma(\alpha+N)}{\Gamma(\alpha)} \; . \]

Use this result to convince yourself that the posterior \(P(\lambda\mid D,\alpha,\beta)\) is another Gamma distribution, but with different hyperparameter values \(\alpha'\) and \(\beta'\). Priors and posteriors with the same functional form for some likelihood are called conjugate priors. The binomial_learn example in class also used conjugate priors.

Implement the function below to learn from measured decay times by updating the prior hyperparameters:

def rate_learn(prior_alpha, prior_beta, decay_times):
    """Learn from data to update hyperparameters.
    
    Parameters
    ----------
    prior_alpha : float
        Hyperparameter alpha for the prior Gamma distribution PDF.
    prior_beta : float
        Hyperparameter beta for the prior Gamma distribution PDF.
    decay_times : array
        Array of observed decay times.
        
    Returns
    -------
    tuple
        Tuple (post_alpha, post_beta) of hyperparameter values for the
        posterior Gamma distribution PDF.
    """
    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
assert rate_learn(1, 1, []) == (1, 1)
assert rate_learn(2, 1, []) == (2, 1)
assert rate_learn(1, 2, []) == (1, 2)
assert np.allclose(
    np.round(rate_learn(np.sqrt(10), np.pi, [1,2,1]), 3),
    (6.162, 7.142))

You can use the function below to test your code visually and confirm that better data reduces the influence of the prior:

def rate_learn_plot(prior_alpha, prior_beta, num_decays, true_lam, seed=123):
    """
    """
    # Generate some random decays using the true decay rate.
    gen = np.random.RandomState(seed=seed)
    decay_times = scipy.stats.expon.rvs(scale=1 / true_lam, size=num_decays, random_state=gen)
    
    # Use Bayes' rule to learn from the data.
    lam = np.linspace(0., 2.5 * true_lam, 250)
    prior = gamma_distribution(lam, prior_alpha, prior_beta)
    like = decay_likelihood(decay_times, lam)
    post_alpha, post_beta = rate_learn(prior_alpha, prior_beta, decay_times)
    post = gamma_distribution(lam, post_alpha, post_beta)
    
    # Plot a summary of the learning process.
    plt.fill_between(lam, prior, alpha=0.25)
    plt.plot(lam, prior, label='Prior')
    plt.plot(lam, like / np.max(like) * np.max(prior), 'k:', label='Likelihood')
    plt.fill_between(lam, post, alpha=0.25)
    plt.plot(lam, post, label='Posterior')
    plt.axvline(true_lam, c='r', ls='--')
    plt.xlabel('Decay rate $\lambda$')
    plt.legend(loc='upper right', fontsize='x-large')
    plt.xlim(0, lam[-1])
    plt.ylim(0, None)
rate_learn_plot(prior_alpha=1, prior_beta=0.2, num_decays=10, true_lam=5)
rate_learn_plot(prior_alpha=1, prior_beta=0.2, num_decays=100, true_lam=5)

Problem 3#

In this problem you will solve the same decay rate inference problem but this time using a numerical estimate based on Markov-chain Monte Carlo (MCMC).

Recall that MCMC_sample() generates samples using a function proportional to the desired PDF. Implement the function below to evaluate the logarithm of the un-normalized posterior probability density:

\[ \Large \log P(D\mid \lambda) + \log P(\lambda\mid \alpha, \beta) \; . \]

Do not call your decay_likelihood() or gamma_distribution() functions in your implementation since the result has better accuracy if you apply the logarithm and simplify analytically. Since MCMC sampling only requires a function proportional to the desired PDF, you can drop any factors in \(P(D\mid \lambda)\) or \(P(\lambda\mid \alpha, \beta)\) that do not depend on \(\lambda\).

def decay_logf(lam, decay_times, prior_alpha, prior_beta):
    """Evaluate a function proportional to the log-posterior probability density.
    
    Parameters
    ----------
    lam : float
        Decay rate parameter.
    decay_times : array
        Array of observed decay times.
    prior_alpha : float
        Hyperparameter alpha for the prior Gamma distribution PDF.
    prior_beta : float
        Hyperparameter beta for the prior Gamma distribution PDF.
    
    Returns
    -------
    float
        log P(D|lam) + log P(lam|alpha,beta) up to a constant that does not
        depend on the value of lam. Returns -np.inf when lam <= 0.
    """
    # YOUR CODE HERE
    raise NotImplementedError()
# A correct solution should pass these tests.
a, b = 1, 0.2
offset = decay_logf(1, [], a, b)
assert np.allclose(decay_logf(0.1, [], a, b) - offset, +0.18)
assert np.allclose(decay_logf(2, [], a, b) - offset, -0.2)
assert np.allclose(np.round(decay_logf(0.1, [1,2,1], a, b), 3) - offset, -7.128)
assert np.allclose(np.round(decay_logf(1, [1,2,1], a, b), 3) - offset, -4.000)
assert np.allclose(np.round(decay_logf(2, [1,2,1], a, b), 3) - offset, -6.121)

You can use the function below to test your numerical solution and compare with the posterior found using rate_learn_plot above.

import functools
import inspect
import emcee

def wrap(func, **kwargs):
    """Prepare an arbitrary function to use with emcee sampling.

    Emcee expects its parameters in a single list, but it is sometimes more
    convenient to write a function in terms of named parameters and
    hyperparameters. This method uses introspection to wrap an arbitrary
    function with named parameters so that it has the signature expected
    by emcee.

    For example:

        def f(x,y,a,b): ...
        wrap(f, x=[1], y=[2], a=3, b=4, c=3, d=4)

    returns a tuple (wrapped, ['x','y'], [1,2], {'c':3, 'd':4}) where:

      - wrapped([p,q]) calls f(x=p,y=q,a=3,b=4)
      - [1,2] are the initial values to use for parameters named ['x','y'].
      - {'c':3, 'd':4} are the input kwargs with args of f() removed.

    The square brackets identify floating arguments and specify their initial
    value. An optional callable to evaluate a log-prior can also be passed,
    for example:

        wrap(f, x=[1,px], y=[2,py], a=3, b=4, c=3, d=4)

    where px(x) and py(y) return the (un-normalized) log of the priors on
    x and y to use during posterior sampling.

    Parameters
    ----------
    func : callable
        The function that should be prepared. It is assumed to have only
        numerical arguments that accept any floating point values.
    **kwargs : keyword arguments
        All arguments of func must be included and assigned a value.
        Arguments assigned a floating point value are considered fixed
        during sampling.  Arguments assigned a floating point value
        within a list, e.g., [1.2], will be sampled using the initial
        value provided.  Sampled arguments can optionally also specify
        a log-prior distribution using, e.g. [1.2, lnprior], where lnprior
        is a function of the sampled argument that returns the log prior
        probability density (which does not need to be normalized).

    Returns
    -------
    tuple
        Tuple (wrapped, names, values, kwargs). See example above for details.
    """
    fixed = {}
    names, values, lnpriors = [], [], []
    funcsig = inspect.signature(func)
    try:
        funcargs = {name: kwargs[name] for name in funcsig.parameters}
    except KeyError:
        raise ValueError('Missing arguments.')
    bound = funcsig.bind(**funcargs)
    bound.apply_defaults()
    NoPrior = lambda x: 0.
    for name, value in bound.arguments.items():
        if isinstance(value, list):
            names.append(name)
            values.append(value.pop(0))
            lnpriors.append(value.pop(0) if value else NoPrior)
            if value:
                raise ValueError('Invalid syntax for argument {}.'.format(name))
        else:
            fixed[name] = value
    partial = functools.partial(func, **fixed)
    def wrapped(theta):
        if len(theta) != len(names):
            raise ValueError('expected list of {} values.'.format(len(names)))
        result = 0.
        for lnprior, value in zip(lnpriors, theta):
            result += lnprior(value)
            if not np.isfinite(result):
                # theta is not allowed by this prior.
                return -np.inf
        args = dict(zip(names, theta))
        result += partial(**args)
        return result
    # Remove function args from kwargs.
    for name in funcargs:
        kwargs.pop(name, None)
    return wrapped, names, values, kwargs


def sample(func, names, values, nwalkers=20, nsamples=1000, abs_rms=1e-4,
           frac_rms=1e-3, burnin=100, random_state=None):
    """Generate MCMC samples of the un-normalized PDF func() using emcee.

    Can be used standalone but intended to work with :func:`wrap`.

    Initial values for each walker are Gaussian samples centered on the
    input values with an RMS of max(abs_rms, frac_rms * values).

    Parameters
    ----------
    func : callable
        Evaluate the log PDF to sample. Passed a single list of parameter
        values. Can be prepared using :func:`wrap`.
    names : iterable
        List of names for each floating parameter.  Used to label columns
        in the returned DataFrame. Can be prepared using :func:`wrap`.
    values : iterable
        List of initial values for each floating parameter.  Used to center
        random initial values for each walker. Can be prepared using
        :func:`wrap`.
    nwalkers : int
        The number of emcee walkers to use.
    nsamples : int
        The total number of samples to return, after combining walkers
        and trimming initial burnin.
    abs_rms : float
        Used to set walker initial values.  See above for details.
    rel_rms : float
        Used to set walker initial values.  See above for details.
    burnin : int
        The number of samples to remove from each walker's chain.
    random_state : np.random.RandomState or None
        The random state to use for reproducible chains.

    Returns
    -------
    pandas.DataFrame
        Generated samples in a dataframe, using the inputs names for columns.
    """
    if random_state is None:
        random_state = np.random.RandomState()
    # Generate sampler starting points.
    ndim = len(names)
    values = np.array(values, float)
    initial = np.tile(values, (nwalkers, 1))
    rms = np.maximum(abs_rms, frac_rms * values)
    initial += rms * random_state.normal(size=(nwalkers, ndim))
    # Initialize and run sampler.
    sampler = emcee.EnsembleSampler(nwalkers, ndim, func)
    n_per_chain = 1 + nsamples // nwalkers + burnin
    sampler.run_mcmc(initial, n_per_chain, rstate0=random_state.get_state())
    # Remove burnin and return results in a DataFrame.
    chain = sampler.chain[:, burnin:].reshape(-1, ndim)[:nsamples]
    return pd.DataFrame(chain, columns=names)


def MCMC_sample(func, **kwargs):
    """Generate random samples from an un-normalized PDF.

    See :func:`wrap` and :func:`sample` for details.

    Parameters
    ----------
    func : callable
        Function to evaluate log(f(...)) where f(...) is proportional
        to the desired probability density.  Will be wrapped to
        determine which arguments are sampled and which are fixed.
    **kwargs : keyword arguments
        Used to configure the wrapping of func and the sampler.

    Returns
    -------
    pandas.DataFrame
        Generated samples in a dataframe, with one named column per
        sampled argument of the input function.
    """
    # Wrap the input function.
    wrapped, names, values, kwargs = wrap(func, **kwargs)
    # Generate emcee samples.
    return sample(wrapped, names, values, **kwargs)
def MCMC_rate_learn_plot(prior_alpha, prior_beta, num_decays, true_lam, seed=123):
    # Generate some random decays using the true decay rate.
    gen = np.random.RandomState(seed=seed)
    decay_times = scipy.stats.expon.rvs(scale=1 / true_lam, size=num_decays, random_state=gen)
    # Generate MCMC samples of the decay rate for this data with this prior.
    samples = MCMC_sample(decay_logf, lam=[true_lam], decay_times=decay_times,
                          prior_alpha=prior_alpha, prior_beta=prior_beta, nsamples=20000)
    # Plot samples.
    plt.hist(samples['lam'], range=(0, 2.5 * true_lam), bins=40, density=True)
    plt.axvline(true_lam, c='r', ls='--')
    plt.xlabel('Decay rate $\lambda$')
    plt.xlim(0, 2.5 * true_lam)
MCMC_rate_learn_plot(prior_alpha=1, prior_beta=0.2, num_decays=10, true_lam=5)
MCMC_rate_learn_plot(prior_alpha=1, prior_beta=0.2, num_decays=100, true_lam=5)

Acknowledgments#

  • Initial version: Mark Neubauer

© Copyright 2024