Source code for convoys.regression

from __future__ import annotations

import warnings
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Callable, Literal

import emcee  # type: ignore[import-untyped]
import numpy
import progressbar
import scipy.optimize  # type: ignore[import-untyped]
from scipy.special import gammaincinv  # type: ignore[import-untyped]

with warnings.catch_warnings():
    # we have pinned numpy and SciPy, so we can ignore the deprecation warnings
    # issued by autograd 1.7
    warnings.filterwarnings("ignore", category=DeprecationWarning)
    import autograd  # type: ignore[import-untyped]
    from autograd.numpy import (  # type: ignore[import-untyped]
        dot,
        exp,
        isnan,
        log,
        sum,  # noqa: A004
    )
    from autograd.scipy.special import expit, gammaln  # type: ignore[import-untyped]
    from autograd_gamma import gammainc  # type: ignore[import-untyped]

if TYPE_CHECKING:
    from numpy.typing import ArrayLike


__all__ = ["Exponential", "Weibull", "Gamma", "GeneralizedGamma"]


def generalized_gamma_loss(
    x: tuple[float, ...],
    X: numpy.ndarray,
    B: numpy.ndarray,
    T: numpy.ndarray,
    W: numpy.ndarray,
    fix_k: int | None,
    fix_p: int | None,
    hierarchical: bool,
    flavor: Literal["logistic", "linear"],
    callback: Callable[[float], None] | None = None,
) -> float:
    k = exp(x[0]) if fix_k is None else fix_k
    p = exp(x[1]) if fix_p is None else fix_p
    log_sigma_alpha = x[2]
    log_sigma_beta = x[3]
    a = x[4]
    b = x[5]
    n_features = int((len(x) - 6) / 2)
    alpha = x[6 : 6 + n_features]
    beta = x[6 + n_features : 6 + 2 * n_features]
    lambd = exp(dot(X, alpha) + a)

    # PDF: p*lambda^(k*p) / gamma(k) * t^(k*p-1) * exp(-(x*lambda)^p)
    log_pdf = (
        log(p)
        + (k * p) * log(lambd)
        - gammaln(k)
        + (k * p - 1) * log(T)
        - (T * lambd) ** p
    )
    cdf = gammainc(k, (T * lambd) ** p)

    if flavor == "logistic":  # Log-likelihood with sigmoid
        c = expit(dot(X, beta) + b)
        LL_observed = log(c) + log_pdf
        LL_censored = log((1 - c) + c * (1 - cdf))
    elif flavor == "linear":  # L2 loss, linear
        c = dot(X, beta) + b
        LL_observed = -((1 - c) ** 2) + log_pdf
        LL_censored = -((c * cdf) ** 2)

    LL_data = sum(W * B * LL_observed + W * (1 - B) * LL_censored, 0)

    if hierarchical:
        # Hierarchical model with sigmas ~ invgamma(1, 1)
        LL_prior_a = (
            -4 * log_sigma_alpha
            - 1 / exp(log_sigma_alpha) ** 2
            - dot(alpha, alpha) / (2 * exp(log_sigma_alpha) ** 2)
            - n_features * log_sigma_alpha
        )
        LL_prior_b = (
            -4 * log_sigma_beta
            - 1 / exp(log_sigma_beta) ** 2
            - dot(beta, beta) / (2 * exp(log_sigma_beta) ** 2)
            - n_features * log_sigma_beta
        )
        LL: float = LL_prior_a + LL_prior_b + LL_data
    else:
        LL = LL_data

    if isnan(LL):
        return -numpy.inf
    if callback is not None:
        callback(LL)
    return LL


class RegressionModel(ABC):
    @abstractmethod
    def fit(
        self,
        X: "ArrayLike",
        B: "ArrayLike",
        T: "ArrayLike",
        W: "ArrayLike" | None = None,
    ) -> None:
        raise NotImplementedError("Need to implement fit")

    @abstractmethod
    def predict(self, group: "ArrayLike", t: "ArrayLike") -> numpy.ndarray:
        raise NotImplementedError("Need to implement predict")

    @abstractmethod
    def predict_ci(
        self, group: "ArrayLike", t: "ArrayLike", ci: float
    ) -> numpy.ndarray:
        raise NotImplementedError("Need to implement predict_ci")

    @abstractmethod
    def rvs(
        self, x: "ArrayLike", *args: Any, **kwargs: Any
    ) -> tuple[numpy.ndarray, numpy.ndarray]:
        raise NotImplementedError("Need to implement rvs")


[docs] class GeneralizedGamma(RegressionModel): """Generalization of Gamma, Weibull, and Exponential :param mcmc: boolean, defaults to False. Whether to use MCMC to sample from the posterior so that a confidence interval can be estimated later (see :meth:`predict`). :param fix_k: int or None. If an int, fixes :math:`k` to that value. For example, use `fix_k=1` for a Weibull distribution. :param fix_p: int or None. If an int, fixes :math:`p` to that value. Use `fix_p=1` for a Gamma distribution. :param hierarchical: boolean denoting whether we have a (Normal) prior on the alpha and beta parameters to regularize. The variance of the normal distribution is in itself assumed to be an inverse gamma distribution (1, 1). :param flavor: defaults to logistic. If set to 'linear', then an linear model is fit, where the beta params will be completely additive. This creates a much more interpretable model, with some minor loss of accuracy. This mostly follows the `Wikipedia article <https://en.wikipedia.org/wiki/Generalized_gamma_distribution>`_, although our notation is slightly different. Also see `this paper <https://grodri.github.io/survival/pop509slides1.pdf>`_ for an overview. **Shape of the probability function** The cumulative density function is: :math:`F(t) = P(k, (t\\lambda)^p)` where :math:`P(a, x) = \\gamma(a, x) / \\Gamma(a)` is the lower regularized incomplete gamma function. :math:`\\gamma(a, x)` is the incomplete gamma function and :math:`\\Gamma(a)` is the standard gamma function. The probability density function is: :math:`f(t) = p\\lambda^{kp} t^{kp-1} \\exp(-(t\\lambda)^p) / \\Gamma(k)` **Modeling conversion rate** Since our goal is to model the conversion rate, we assume the conversion rate converges to a final value :math:`c = \\sigma(\\mathbf{\\beta^Tx} + b)` where :math:`\\sigma(z) = 1/(1+e^{-z})` is the sigmoid function, :math:`\\mathbf{\\beta}` is an unknown vector we are solving for (with corresponding intercept :math:`b`), and :math:`\\mathbf{x}` are the feature vector (inputs). We also assume that the rate parameter :math:`\\lambda` is determined by :math:`\\lambda = exp(\\mathbf{\\alpha^Tx} + a)` where :math:`\\mathrm{\\alpha}` is another unknown vector we are trying to solve for (with corresponding intercept :math:`a`). We also assume that the :math:`\\mathbf{\\alpha}, \\mathbf{\\beta}` vectors have a normal distribution :math:`\\alpha_i \\sim \\mathcal{N}(0, \\sigma_{\\alpha})`, :math:`\\beta_i \\sim \\mathcal{N}(0, \\sigma_{\\beta})` where hyperparameters :math:`\\sigma_{\\alpha}^2, \\sigma_{\\beta}^2` are drawn from an inverse gamma distribution :math:`\\sigma_{\\alpha}^2 \\sim \\text{inv-gamma}(1, 1)`, :math:`\\sigma_{\\beta}^2 \\sim \\text{inv-gamma}(1, 1)` **List of parameters** The full model fits vectors :math:`\\mathbf{\\alpha, \\beta}` and scalars :math:`a, b, k, p, \\sigma_{\\alpha}, \\sigma_{\\beta}`. **Likelihood and censorship** For entries that convert, the contribution to the likelihood is simply the probability density given by the probability distribution function :math:`f(t)` times the final conversion rate :math:`c`. For entries that *did not* convert, there are two options. Either the entry will never convert, which has probability :math:`1-c`. Or, it will convert at some later point that we have not observed yet, with probability given by the cumulative density function :math:`F(t)`. **Solving the optimization problem** To find the MAP (max a posteriori), `scipy.optimize.minimize <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html#scipy.optimize.minimize>`_ with the SLSQP method. If `mcmc == True`, then `emcee <http://dfm.io/emcee/current/>`_ is used to sample from the full posterior in order to generate uncertainty estimates for all parameters. """ def __init__( self, mcmc: bool = False, fix_k: int | None = None, fix_p: int | None = None, hierarchical: bool = True, flavor: Literal["logistic", "linear"] = "logistic", ) -> None: self._mcmc = mcmc self._fix_k = fix_k self._fix_p = fix_p self._hierarchical = hierarchical self._flavor = flavor
[docs] def fit( self, X: "ArrayLike", B: "ArrayLike", T: "ArrayLike", W: "ArrayLike" | None = None, ) -> None: """Fits the model. :param X: numpy 2-D array of shape :math:`k \\cdot n`, containing floats or ints representing the values of 1 or more features. Can be used with 1-hot encoding to represent group membership. :param B: numpy array of shape :math:`n`, containing booleans representing whether or not the subject 'converted' at time delta :math:`t`. :param T: numpy array of shape :math:`n`, containing floats representing the time delta :math:`t` between creation and either conversion or censoring. :param W: (optional) numpy vector of shape :math:`n`, containing floats representing weights (or counts) for the row's observation(s). If None, defaults to `numpy.ones(len(X))`. """ if W is None: W = numpy.ones(len(X)) # type: ignore[arg-type] X, B, T, W = ( Z if isinstance(Z, numpy.ndarray) else numpy.array(Z) for Z in (X, B, T, W) ) keep_indexes = (T > 0) & (B >= 0) & (B <= 1) & (W >= 0) if sum(keep_indexes) < X.shape[0]: n_removed = X.shape[0] - sum(keep_indexes) warnings.warn( "Warning! Removed %d/%d entries from inputs where " "T <= 0 or B not 0/1 or W < 0" % (n_removed, len(X)), stacklevel=2, ) X, B, T, W = (Z[keep_indexes] for Z in (X, B, T, W)) n_features = X.shape[1] # scipy.optimize and emcee forces the the parameters to be a vector: # (log k, log p, log sigma_alpha, log sigma_beta, # a, b, alpha_1...alpha_k, beta_1...beta_k) # Generalized Gamma is a bit sensitive to the starting point! x0 = numpy.zeros(6 + 2 * n_features) x0[0] = +1 if self._fix_k is None else log(self._fix_k) x0[1] = -1 if self._fix_p is None else log(self._fix_p) args = (X, B, T, W, self._fix_k, self._fix_p, self._hierarchical, self._flavor) # Set up progressbar and callback bar = progressbar.ProgressBar( widgets=[ progressbar.Variable("loss", width=15, precision=9), # type: ignore[no-untyped-call] " ", progressbar.BouncingBar(), # type: ignore[no-untyped-call] " ", progressbar.Counter(width=6), # type: ignore[no-untyped-call] " [", progressbar.Timer(), # type: ignore[no-untyped-call] "]", ] ) value_history = [] def callback(LL: float) -> None: value_history.append(LL) bar.update(len(value_history), loss=LL) # Define objective and use automatic differentiation def f(x: tuple[float, ...]) -> float: return -generalized_gamma_loss(x, *args, callback=callback) jac = autograd.grad(lambda x: -generalized_gamma_loss(x, *args)) # Find the maximum a posteriori of the distribution res = scipy.optimize.minimize( f, x0, jac=jac, method="SLSQP", options={"maxiter": 9999} ) if not res.success: raise Exception("Optimization failed with message: %s" % res.message) result = {"map": res.x} # TODO: should not use fixed k/p as search parameters if self._fix_k: result["map"][0] = log(self._fix_k) if self._fix_p: result["map"][1] = log(self._fix_p) # Make sure we're in a local minimum gradient = jac(result["map"]) gradient_norm = numpy.dot(gradient, gradient) if gradient_norm >= 1e-2 * len(X): warnings.warn( "Might not have found a local minimum! " "Norm of gradient is %f" % gradient_norm, stacklevel=2, ) # Let's sample from the posterior to compute uncertainties if self._mcmc: (dim,) = res.x.shape n_walkers = 5 * dim sampler = emcee.EnsembleSampler( nwalkers=n_walkers, ndim=dim, log_prob_fn=generalized_gamma_loss, args=args, ) mcmc_initial_noise = 1e-3 p0 = [ result["map"] + mcmc_initial_noise * numpy.random.randn(dim) for i in range(n_walkers) ] n_burnin = 100 n_steps = int(numpy.ceil(2000.0 / n_walkers)) n_iterations = n_burnin + n_steps bar = progressbar.ProgressBar( max_value=n_iterations, widgets=[ progressbar.Percentage(), # type: ignore[no-untyped-call] " ", progressbar.Bar(), # type: ignore[no-untyped-call] " %d walkers [" % n_walkers, progressbar.AdaptiveETA(), # type: ignore[no-untyped-call] "]", ], ) for i, _ in enumerate(sampler.sample(p0, iterations=n_iterations)): bar.update(i + 1) result["samples"] = ( sampler.get_chain()[n_burnin:, :, :].reshape(-1, dim, order="F").T ) if self._fix_k: result["samples"][0, :] = log(self._fix_k) if self._fix_p: result["samples"][1, :] = log(self._fix_p) self.params = { k: { "k": exp(data[0]), "p": exp(data[1]), "a": data[4], "b": data[5], "alpha": data[6 : 6 + n_features].T, "beta": data[6 + n_features : 6 + 2 * n_features].T, } for k, data in result.items() }
def _predict( self, params: dict[str, Any], x: "ArrayLike", t: "ArrayLike" ) -> numpy.ndarray: lambd = exp(dot(x, params["alpha"].T) + params["a"]) if self._flavor == "logistic": c = expit(dot(x, params["beta"].T) + params["b"]) elif self._flavor == "linear": c = dot(x, params["beta"].T) + params["b"] else: raise ValueError("flavor must be one of `logistic` or `linear`") M = c * gammainc(params["k"], (t * lambd) ** params["p"]) return M # type: ignore[no-any-return]
[docs] def predict_posteriori(self, x: "ArrayLike", t: "ArrayLike") -> numpy.ndarray: """Returns the trace samples generated via the MCMC steps. Requires the model to be fit with `mcmc == True`.""" x = numpy.array(x) t = numpy.array(t) assert self._mcmc params = self.params["samples"] t = numpy.expand_dims(t, -1) return self._predict(params, x, t)
[docs] def predict_ci( self, x: "ArrayLike", t: "ArrayLike", ci: float = 0.8 ) -> numpy.ndarray: """Works like :meth:`predict` but produces a confidence interval. Requires the model to be fit with `ci = True`. The return value will contain one more dimension than for :meth:`predict`, and the last dimension will have size 3, containing the mean, the lower bound of the confidence interval, and the upper bound of the confidence interval. """ M = self.predict_posteriori(x, t) y = numpy.mean(M, axis=-1) y_lo = numpy.percentile(M, (1 - ci) * 50, axis=-1) y_hi = numpy.percentile(M, (1 + ci) * 50, axis=-1) return numpy.stack((y, y_lo, y_hi), axis=-1)
[docs] def predict(self, x: "ArrayLike", t: "ArrayLike") -> numpy.ndarray: """Returns the value of the cumulative distribution function for a fitted model (using the maximum a posteriori estimate). :param x: feature vector (or matrix) :param t: time """ params = self.params["map"] x = numpy.array(x) t = numpy.array(t) return self._predict(params, x, t)
[docs] def rvs( self, x: "ArrayLike", n_curves: int = 1, n_samples: int = 1, T: numpy.ndarray | None = None, ) -> tuple[numpy.ndarray, numpy.ndarray]: """Samples values from this distribution T is optional and means we already observed non-conversion until T """ assert self._mcmc # Need to be fit with MCMC if T is None: T = numpy.zeros((n_curves, n_samples)) else: assert T.shape == (n_curves, n_samples) B = numpy.zeros((n_curves, n_samples), dtype=numpy.bool) C = numpy.zeros((n_curves, n_samples)) params = self.params["samples"] for i, j in enumerate(numpy.random.randint(len(params["k"]), size=n_curves)): k = params["k"][j] p = params["p"][j] lambd = exp(dot(x, params["alpha"][j]) + params["a"][j]) c = expit(dot(x, params["beta"][j]) + params["b"][j]) z = numpy.random.uniform(size=(n_samples,)) cdf_now = c * gammainc( k, numpy.multiply.outer(T[i], lambd) ** p ) # why is this outer? adjusted_z = cdf_now + (1 - cdf_now) * z B[i] = adjusted_z < c y = adjusted_z / c w = gammaincinv(k, y) # x = (t * lambd)**p C[i] = w ** (1.0 / p) / lambd C[i][~B[i]] = 0 return B, C
[docs] class Exponential(GeneralizedGamma): """Specialization of :class:`.GeneralizedGamma` where :math:`k=1, p=1`. The cumulative density function is: :math:`F(t) = 1 - \\exp(-t\\lambda)` The probability density function is: :math:`f(t) = \\lambda\\exp(-t\\lambda)` The exponential distribution is the most simple distribution. From a conversion perspective, you can interpret it as having two competing final states where the probability of transitioning from the initial state to converted or dead is constant. See documentation for :class:`GeneralizedGamma`.""" def __init__( self, mcmc: bool = False, hierarchical: bool = True, flavor: Literal["logistic", "linear"] = "logistic", ) -> None: super().__init__( mcmc=mcmc, hierarchical=hierarchical, flavor=flavor, fix_p=1, fix_k=1 )
[docs] class Weibull(GeneralizedGamma): """Specialization of :class:`.GeneralizedGamma` where :math:`k=1`. The cumulative density function is: :math:`F(t) = 1 - \\exp(-(t\\lambda)^p)` The probability density function is: :math:`f(t) = p\\lambda(t\\lambda)^{p-1}\\exp(-(t\\lambda)^p)` See documentation for :class:`GeneralizedGamma`.""" def __init__( self, mcmc: bool = False, hierarchical: bool = True, flavor: Literal["logistic", "linear"] = "logistic", ) -> None: super().__init__( mcmc=mcmc, hierarchical=hierarchical, flavor=flavor, fix_k=1, fix_p=None )
[docs] class Gamma(GeneralizedGamma): """Specialization of :class:`.GeneralizedGamma` where :math:`p=1`. The cumulative density function is: :math:`F(t) = P(k, t\\lambda)` where :math:`P(a, x) = \\gamma(a, x) / \\Gamma(a)` is the lower regularized incomplete gamma function. The probability density function is: :math:`f(t) = \\lambda^k t^{k-1} \\exp(-x\\lambda) / \\Gamma(k)` See documentation for :class:`GeneralizedGamma`.""" def __init__( self, mcmc: bool = False, hierarchical: bool = True, flavor: Literal["logistic", "linear"] = "logistic", ) -> None: super().__init__( mcmc=mcmc, hierarchical=hierarchical, flavor=flavor, fix_p=1, fix_k=None )