Source code for qspec.models._base

# -*- coding: utf-8 -*-
"""
qspec.models._base
==================

Base classes for lineshape models.
"""

import numpy as np

from qspec.tools import merge_intervals

__all__ = ['Model', 'Empty', 'NPeak', 'Offset', 'Amplifier', 'Custom', 'YPars', 'Listed', 'Summed', 'Linked']


np_version = np.version.version.split('.')


def _is_scalar(val):
    return isinstance(val, bool) or np.issubdtype(type(val), np.integer) or np.issubdtype(type(val), np.floating)


def _is_unc(fix):
    if isinstance(fix, float):
        return True
    if not isinstance(fix, str):
        return False

    j, k = fix.find('('), fix.find(')')
    try:
        if j == -1 or k == -1:
            raise ValueError('Not a value with uncertainty.')
        _ = '{}({})'.format(float(fix[:j]), float(fix[j+1:k]))
    except ValueError:
        return False
    return True


def _val_fix_to_val(val, fix):
    if isinstance(fix, float):
        return val
    j = fix.find('(')
    return float(fix[:j])


def _fix_to_unc(fix):
    if isinstance(fix, float):
        return fix
    j, k = fix.find('('), fix.find(')')
    return float(fix[j+1:k])


def _args_ordered(args, order):
    return [args[i] for i in order]


def _poly(x, *args):
    return np.sum([args[n] * x ** n for n in range(len(args))], axis=0)


[docs] class Model: """ Base class for all models. """ def __init__(self, model=None): self.model = model self.type = 'Model' def __call__(self, x, *args, **kwargs): return self.evaluate(x, *self.update_args(args), **kwargs)
[docs] def evaluate(self, x, *args, **kwargs): # Reimplement this function in subclasses. """ :param x: The input values. :param args: The function parameters. Must have length self.size. :param kwargs: Additional keyword arguments. :returns: The function results at the input values 'x'. """ pass
def _add_arg(self, name, val, fix, link): """ Add a new parameter to the model. :param name: The parameter name. :param val: The parameter value. :param fix: Whether the parameter is fixed. :param link: Whether the parameter is linked. :returns: None. """ if name in self.names: raise ValueError('Parameter {} already exists.'.format(name)) self.names.append(name) self.vals.append(val) self.fixes.append(fix) self.links.append(link) self.expressions.append('args[{}]'.format(self._index)) self.p[name] = self._index self._index += 1 self._size += 1 @property def description(self): """ A description of the model hierarchy. """ label = '' super_model = self while super_model is not None: if isinstance(super_model, Listed): label += super_model.type + '[0].' super_model = super_model.models[0] else: label += super_model.type + '.' super_model = super_model.model return label[:-1] @property def size(self): """ The number of parameters required by the model. """ return self._size @property def dx(self): """ A hint for an x-axis step size for a smooth display of the model. """ return 0.1 if self.model is None else self.model.dx @property def error(self): """ An error message if there is an issue with the model parameters (not implemented). """ return self._error @property def model(self): """ The submodel. """ return self._model @model.setter def model(self, value): self._model = value if self._model is None: self.names, self.vals, self.fixes, self.links = [], [], [], [] self.expressions = [] self.p = {} self._index = 0 self._size = 0 self._error = '' else: self.names, self.vals, self.fixes, self.links = \ self._model.names, self._model.vals, self._model.fixes, self._model.links self.expressions = self._model.expressions self.p = self._model.p self._index = len(self._model.names) self._size = len(self._model.names) self._error = self._model.error
[docs] def get_pars(self): """ :returns: A zip-iterator over (names, vals, fixes, links). """ return zip(self.names, self.vals, self.fixes, self.links)
[docs] def set_pars(self, pars, force=False): """ Set all vals, fixes and links with one nested list. :param pars: A nested list of shape (self.size, 3). :param force: The 'force' parameter of the 'set_val', 'set_fix' and 'set_link' functions. :returns: None. """ for i, p in enumerate(pars): self.set_val(i, p[0], force=force) self.set_fix(i, p[1], force=force) self.set_link(i, p[2], force=force)
[docs] def set_vals(self, vals, force=False): """ Set all vals with one list. :param vals: A list of shape (self.size, ). :param force: The 'force' parameter of the 'set_val' function. :returns: None. """ for i, val in enumerate(vals): self.set_val(i, val, force=force)
[docs] def set_fixes(self, fixes, force=False): """ Set all fixes with one list. :param fixes: A list of shape (self.size, ). :param force: The 'force' parameter of the 'set_fix' function. :returns: None. """ for i, fix in enumerate(fixes): self.set_fix(i, fix, force=force)
[docs] def set_val(self, i, val, force=False): """ Set a specific parameter value. :param i: The index of the parameter. :param val: The new parameter value. :param force: Force the parameter to take exactly the new value. If False, the parameter is converted to the correct format. :returns: None. """ if force or _is_scalar(val): if self.model is None: self.vals[i] = val if force else float(val) else: self.model.set_val(i, val, force=False) # Set val for all sub-models # to ensure set_val is called for a ListedModel if one is part of the sub-models. # This is only needed for the vals since these may be predicted by the specific model. # Everything else is handled by the top-model. else: raise ValueError(f'The parameter value {val} has the wrong format. Must be a floating.')
[docs] def set_fix(self, i, fix, force=False): """ Set a specific parameter fix state. :param i: The index of the parameter. :param fix: The new parameter value. :param force: Force the parameter to take exactly the new fix state. If False, the parameter is converted to the correct format. :returns: None. """ if force: self.fixes[i] = fix return if _is_scalar(fix): if isinstance(fix, bool): fix = bool(fix) elif fix <= 0 or np.isinf(fix): fix = fix <= 0 else: fix = float(fix) expr = 'args[{}]'.format(i) elif isinstance(fix, str): j, k = fix.find('('), fix.find(')') try: if j == -1 or k == -1: raise ValueError('Not a value with uncertainty.') fix = '{}({})'.format(float(fix[:j]), float(fix[j+1:k])) expr = 'args[{}]'.format(i) except ValueError: _fix = fix for j, name in enumerate(self.names): _fix = _fix.replace(name, 'eval(self.expressions[{}])'.format(j)) expr = _fix try: self._eval_zero_division(self.vals, expr) except (ValueError, TypeError, SyntaxError, NameError) as e: print('Invalid expression for parameter \'{}\': {}. Got a {}.'.format(self.names[i], fix, repr(e))) return elif isinstance(fix, (tuple, list, np.ndarray)): if len(fix) == 0: fix = [0, 1] elif len(fix) == 1: fix = [0, fix[0]] else: fix = list(fix[:2]) expr = 'args[{}]'.format(i) else: raise ValueError(f'The parameter fix state {fix} has the wrong format.' f' Must be a scalar, bool, str or iterable.') temp_expr = self.expressions[i] temp_fix = self.fixes[i] self.expressions[i] = compile(expr, '<string>', 'eval', optimize=2) # Compile beforehand to save time. self.fixes[i] = fix try: self.update_args(self.vals) except RecursionError as e: print('Expression for {} with fix {} form a loop. Got a {}.'.format(self.names[i], fix, repr(e))) self.expressions[i] = temp_expr self.fixes[i] = temp_fix
def _eval_zero_division(self, args, expr): """ Safely calculate parameter expressions including nans, infs and zero divisions. :param args: The parameters. :param expr: The parameter expression. :returns: The processed parameter or 0 in case of a nan, inf or zero division. """ try: with np.errstate(divide='ignore', invalid='ignore'): ret = eval(expr, {}, {'self': self, 'args': args}) return 0. if np.isnan(ret) or np.isinf(ret) else ret except ZeroDivisionError: return 0.
[docs] def update_args(self, args): """ :param args: The parameters. :returns: the parameters updated with the parameter expressions. """ return tuple(self._eval_zero_division(args, expr) for expr in self.expressions)
[docs] def update(self): """ Updates self.vals with updated parameters, see self.update_args. :returns: None. """ self.set_vals(self.update_args(self.vals), force=False)
[docs] def min(self): """ :returns: A hint for an x-axis minimum for a complete display of the model. """ return -1. if self.model is None else self.model.min()
[docs] def max(self): """ :returns: A hint for an x-axis maximum for a complete display of the model. """ return 1. if self.model is None else self.model.max()
[docs] def intervals(self): """ :returns: A list of x-axis intervals for a complete display of the model. """ return [[self.min(), self.max()]] if self.model is None else self.model.intervals()
[docs] def x(self): """ :returns: An array of x values for a complete and smooth display of the model. """ return np.concatenate([np.arange(i[0], i[1], self.dx, dtype=float) for i in self.intervals()], axis=0)
[docs] def fit_prepare(self): """ :returns: fixed, bounds. A list of bool values which parameters are not varied in a fit and a list of bounds for the fit parameters. See parameters 'p0_fixed' and 'bounds' of 'qspec.curve_fit'. """ bounds = (-np.inf, np.inf) fixed = [fix for fix in self.fixes] b_lower = [] b_upper = [] _bounds = False for i, fix in enumerate(self.fixes): if isinstance(fix, bool): b_lower.append(-np.inf) b_upper.append(np.inf) elif _is_unc(fix): b_lower.append(-np.inf) b_upper.append(np.inf) fixed[i] = False elif isinstance(fix, list): _bounds = True b_lower.append(fix[0]) b_upper.append(fix[1]) fixed[i] = False elif isinstance(fix, str): b_lower.append(-np.inf) b_upper.append(np.inf) fixed[i] = True _fix = fix else: raise TypeError('The type {} of the element {} in self.fixes is not supported.' .format(type(fix), fix)) if _bounds: bounds = (b_lower, b_upper) return fixed, bounds
[docs] class Empty(Model): """ An empty model, returning zeros with the same shape as x. """ def __init__(self): super().__init__(model=None) self.type = 'Empty'
[docs] def evaluate(self, x, *args, **kwargs): return np.zeros_like(x)
[docs] class NPeak(Model): """ Evaluates the given 'model' at the positions x\ :sub:`i` with scalings p\ :sub:`i` where i < 'n_peaks'. """ def __init__(self, model, n_peaks=1): super().__init__(model=model) self.type = 'NPeak' self.n_peaks = int(n_peaks) for n in range(self.n_peaks): self._add_arg('x{}'.format(n), 0., n == 0, False) self._add_arg('p{}'.format(n), 1., n == 0, False)
[docs] def evaluate(self, x, *args, **kwargs): return np.sum([args[self.model.size + 2 * n + 1] * self.model.evaluate(x - args[self.model.size + 2 * n], *args[:self.model.size]) for n in range(self.n_peaks)], axis=0)
[docs] def min(self): min_center = min(self.vals[self.p['x{}'.format(n)]] for n in range(self.n_peaks)) return min_center + self.model.min()
[docs] def max(self): max_center = max(self.vals[self.p['x{}'.format(n)]] for n in range(self.n_peaks)) return max_center + self.model.max()
[docs] def intervals(self): return merge_intervals([[i[0] + self.vals[self.model.size + 2 * n], i[1] + self.vals[self.model.size + 2 * n]] for i in self.model.intervals() for n in range(self.n_peaks)])
[docs] class Offset(Model): """ Cuts the x-axis and adds y-axis offsets to every segment. """ def __init__(self, model=None, x_cuts=None, offsets=None): """ :param model: The model the offset will be added to. If None, the offset will be added to zero. :param x_cuts: x values where to cut the x-axis. :param offsets: A list of maximally considered polynomial orders for each slice. The list must have length len(x_cuts) + 1. """ super().__init__(model=model) self.type = 'Offset' if x_cuts is None: x_cuts = [] self.x_cuts = sorted(list(x_cuts)) self.offsets = offsets if self.offsets is None: self.offsets = [0] if len(self.offsets) != len(self.x_cuts) + 1: raise ValueError('The parameter offset must be a list of size \'len(x_cuts) + 1\'' ' and contain the maximally considered polynomial order for each slice.') self.offset_map = [] self.offset_masks = [] self.update_on_call = True self.gen_offset_map()
[docs] def evaluate(self, x, *args, **kwargs): if self._model is None: return self._offset(x, *args) return self.model.evaluate(x, *args[:self.model.size]) + self._offset(x, *args)
[docs] def set_x_cuts(self, x_cuts): """ Set the values where to cut the x-axis into intervals with individual offset parameters. :param x_cuts: A list of x values. :returns: None. """ x_cuts = list(x_cuts) if len(x_cuts) != len(self.x_cuts): raise ValueError('\'x_cuts\' must not change its size.') self.x_cuts = sorted(list(x_cuts))
def _offset(self, x, *args): """ :param x: The input values. :param args: The function parameters. :returns: The offset polynomial. """ if self.update_on_call: self.gen_offset_masks(x) ret = np.zeros_like(x) for i, mask in enumerate(self.offset_masks): ret[mask] = _poly(x[mask], *_args_ordered(args, self.offset_map[i])) return ret
[docs] def gen_offset_map(self): """ Generate the offset parameters and a map of the interval and polynomial order to the parameter index space. :returns: None. """ self.offset_map = [] for i, n in enumerate(self.offsets): self.offset_map.append([]) for k in range(n + 1): self.offset_map[-1].append(self._index) self._add_arg('off{}e{}'.format(i, k), 0., False, False)
""" Preprocessing """
[docs] def gen_offset_masks(self, x): """ Generate the array masks corresponding to the 'x_cuts'. :param x: The input values. :returns: None. """ self.offset_masks = [] for x0, x1 in zip([np.min(x) - 1., ] + self.x_cuts, self.x_cuts + [np.max(x) + 1., ]): x_mean = 0.5 * (x0 + x1) self.offset_masks.append(np.abs(x - x_mean) < x1 - x_mean)
[docs] def guess_offset(self, x, y): """ Guess the first two polynomial orders for a given data set. :param x: The input values. :param y: The y data. :returns: None. """ for i, mask in enumerate(self.offset_masks): self.vals[self.p['off{}e0'.format(i)]] = 0.5 * (y[mask][0] + y[mask][-1]) try: self.vals[self.p['off{}e1'.format(i)]] = (y[mask][-1] - y[mask][0]) / (x[mask][-1] - x[mask][0]) except KeyError: return
[docs] class Amplifier(Model): """ A polynomial of order 'order'. """ def __init__(self, order=None): super().__init__(model=None) self.type = 'Amplifier' if order is None: order = 1 self.order = order for n in range(order + 1): self._add_arg('a{}'.format(n), 1. if n == 1 else 0., False, False) self._min = -10 self._max = 10
[docs] def evaluate(self, x, *args, **kwargs): self._min = np.min(x) self._max = np.max(x) return _poly(x, *args)
@property def dx(self): return 1e-2
[docs] def min(self): return self._min
[docs] def max(self): return self._max
[docs] class Custom(Model): """ A model with custom parameters. Without a submodel, Custom returns the user-specified parameters as an array regardless of the input x. Otherwise, simply the submodel is called and the custom parameters can be connected to other parameters by the user. """ def __init__(self, model=None, parameters=None): super().__init__(model=model) self.type = 'Custom' if parameters is None: parameters = [] self.parameters = parameters for p in self.parameters: self._add_arg(p, 0., False, False)
[docs] def evaluate(self, x, *args, **kwargs): if self.model is None: return np.array(args, dtype=float) return self.model.evaluate(x, *args[:self.model.size], **kwargs)
[docs] class YPars(Model): """ Concatenates the parameters of the submodel with uncertainties as fix states with the y-axis array resulting from calling the submodel. This is used internally in 'qspec.models.fit'. """ def __init__(self, model): super().__init__(model=model) self.type = 'YPars' self.p_y = [i for i, fix in enumerate(self.model.fixes) if _is_unc(fix)]
[docs] def evaluate(self, x, *args, **kwargs): return np.concatenate([self.model.evaluate(x, *args, **kwargs), np.array([args[p_y] for p_y in self.p_y], dtype=float)], axis=0)
[docs] class Listed(Model): """ An abstract class for models with multiple submodels. """ def __init__(self, models, labels=None): super().__init__(model=None) self.type = 'Listed' self.models = models # models is just a reference to a list defined somewhere else. self.labels = labels if self.labels is None: if len(self.models) == 1: self.labels = ['', ] else: self.labels = ['__{}'.format(i) for i in range(len(self.models))] self.slices = [] self.model_map = [] self.index_map = [] for i, (model, label) in enumerate(zip(self.models, self.labels)): self.slices.append(slice(self._index, self._index + model.size, 1)) for j, (name, val, fix, link) in enumerate(model.get_pars()): self.model_map.append(i) self.index_map.append(j) if isinstance(fix, str) and not _is_unc(fix): for _name in model.names: fix = fix.replace(_name, '{}{}'.format(_name, label)) self._add_arg('{}{}'.format(name, label), val, fix, link) self.set_fixes(list(self.fixes))
[docs] def set_val(self, i, val, force=False): super().set_val(i, val, force=force) if i < len(self.model_map): self.models[self.model_map[i]].set_val(self.index_map[i], self.vals[i], force=True)
[docs] def set_fix(self, i, fix, force=False): super().set_fix(i, fix, force=force) if i < len(self.model_map): self.models[self.model_map[i]].set_fix(self.index_map[i], self.fixes[i], force=True)
[docs] def inherit_vals(self, force=False): """ Inherit the parameter values of the submodels. :param force: The 'force' parameter of 'self.set_val'. :returns: None """ self.set_vals([val for model in self.models for val in model.vals], force=force)
[docs] def inherit_fixes(self, force=False): """ Inherit the parameter fixes of the submodels. :param force: The 'force' parameter of 'self.set_fix'. :returns: None """ self.set_fixes([fix for model in self.models for fix in model.fixes], force=force)
[docs] class Summed(Listed): """ A model summing over all submodels. """ def __init__(self, models, labels=None): super().__init__(models, labels=labels) self.type = 'Summed' self.indices_add = [] for n, (model, label) in enumerate(zip(self.models, self.labels)): self.indices_add.append([self._index, self._index + 1]) self._add_arg('center{}'.format(label), 0., False, False) self._add_arg('int{}'.format(label), 1., False, False)
[docs] def evaluate(self, x, *args, **kwargs): return np.sum([args[i[1]] * model.evaluate(x - args[i[0]], *args[_slice], **kwargs) for model, _slice, i in zip(self.models, self.slices, self.indices_add)], axis=0)
@property def dx(self): return min(model.dx for model in self.models)
[docs] def min(self): return min(model.min() for model in self.models)
[docs] def max(self): return max(model.max() for model in self.models)
[docs] def intervals(self): return merge_intervals([[i[0] + self.vals[j[0]], i[1] + self.vals[j[0]]] for model, j in zip(self.models, self.indices_add) for i in model.intervals()])
[docs] class Linked(Listed): """ A model linking all "link=True" parameters of the submodels. """ def __init__(self, models): super().__init__(models, labels=None) self.type = 'Linked' for i, (name, val, fix, link) in enumerate(self.get_pars()): if link and (not fix or isinstance(fix, list) or _is_unc(fix)): _name = name[:name.rfind('__')] for j, model in enumerate(self.models): if j < self.model_map[i] and _name in model.names: self.set_fix(i, '{}__{}'.format(_name, j)) break
[docs] def evaluate(self, x, *args, **kwargs): return np.concatenate(tuple(model.evaluate(_x, *args[_slice], **kwargs) for model, _slice, _x in zip(self.models, self.slices, x)), axis=0)
[docs] def set_fix(self, i, fix, force=False): super(Listed, self).set_fix(i, fix, force=force)