Source code for circadapt.components

"""
Components for easy input/output of the CircAdapt model.

The component is a general class. It holds parameters that will be called
using the Parameter object and signals that are called using the Signals
object.
"""

import numpy as np
from circadapt.settings import __version__
from typing import Union

[docs] class Component: """ General functions to communicate with c++ objects. Parameters ---------- model: ctypes object C++ model locs: string Locations of objects in the c++ model that can be altered along this component """ parameters = [] signals = [] parameter_on_set = {} def __init__(self, component_type, model, objects=None): self._component_type = component_type self._model = model self.objects = [] if objects is None else objects self.build()
[docs] def build(self): self.objects_short = np.array([ o.split('.')[-1] for o in self.objects ])
[docs] def add_object(self, o): self.objects.append(o) self.build()
[docs] def __repr__(self): """Object representation in string format.""" return '<' + str(self.__class__.__name__) + '> \n' + \ 'parameters: \n ' + self.parameters.__repr__() + \ '\n\n signals: \n ' + self.signals.__repr__() + \ '\n\n objects: \n ' + self.objects.__repr__() + \ '\n<\\' + str(self.__class__.__name__) + '>'
[docs] def __getitem__(self, arg: any) -> any: """ Get data. This function is called when self[arg]. Parameters ---------- arg: slice or string If slice, return a components object with locs obtained from slice. If str, return the parameter or signal for all locs """ if isinstance(arg, slice) or \ isinstance(arg, int): return self.__class__( self._model, objects=self.objects[arg], ) if arg in self.signals: return Signal(self._model, arg, self.objects) if arg in self.parameters: return Parameter( self._model, arg, self.objects, self.parameter_on_set.get(arg, None), ) raise ValueError('Argument unknown')
[docs] def __setitem__(self, arg: str, val: any) -> any: """ Set data. Set data of the parameter given in arg. Only parameter names can be used. Parameters ---------- arg: str Parameter name val: float/int/bool Value will automatically be translated to type to """ if arg in self.parameters: par = Parameter(self._model, arg, self.objects) par[:] = val # trigger function if arg in self.parameter_on_set: self.parameter_on_set[arg](self._model) return if arg in self.signals: raise ValueError('Can not set signal') raise ValueError(f'Parameter "{arg}" unknown')
[docs] def __iter__(self): """Iterate over object, used for dict(self).""" yield 'objects', self.objects for key in self.parameters + self.signals: value = self.__getitem__(key)._get() yield key, value
[docs] def add(self, name): split_name = name.split('.') return self._model.add_component(self._component_type, split_name[-1], name[:-len(split_name[-1])-1])
[docs] class ParSig: """Basic functions used for Parameter and Signal classes.""" def __init__(self, model, par, locs): self._model = model self._par = par self.objects = locs self._dtype = self._find_dtype() # Store number of run commmands of circadapt model. # If is changed, values do not belong to model anymore and set function # must be disabled. self._model_i_run = model._count_run_commands self.build() def _find_dtype(self): return self._model._find_dtype(self.objects[0]+'.'+self._par)
[docs] def build(self): self.objects_short = np.array([ o.split('.')[-1] for o in self.objects ]) self._values = self._get()
[docs] def __repr__(self): """Object representation in string format.""" return self._values.__repr__()
[docs] def __add__(self, val): """Handle the + operator.""" if isinstance(val, Signal): val = val[:] return self._values + val
[docs] def __radd__(self, val): """Handle the + operator as right hand element.""" return val + self._values
[docs] def __sub__(self, val): """Handle the - operator.""" if isinstance(val, Signal): val = val[:] return self._values - val
[docs] def __rsub__(self, val): """Handle the - operator as right hand element.""" return val - self._values
[docs] def __mul__(self, val): """Handle the * operator.""" if isinstance(val, Signal): val = val[:] return self._values * val
[docs] def __rmul__(self, val): """Handle the * operator as right hand element.""" return val * self._values
[docs] def __truediv__(self, val): """Handle the / operator.""" return self._values / val
[docs] def __rtruediv__(self, val): """Handle the / operator as right hand element.""" return val / self._values
[docs] def __floordiv__(self, val): """Handle the // operator.""" return self._values // val
[docs] def __rfloordiv__(self, val): """Handle the // operator.""" return val // self._values
[docs] def __mod__(self, val): """Handle the % operator.""" return self._values % val
[docs] def __rmod__(self, val): """Handle the % operator.""" return val % self._values
[docs] def __pow__(self, val): """Handle the ** operator.""" return self._values ** val
[docs] def __rpow__(self, val): """Handle the ** operator as right hand element.""" return val ** self._values
[docs] def __neg__(self): """Handle -self.""" return -self._values
[docs] def __lt__(self, val): """Handle the < operator.""" return self._values < np.array(val)
[docs] def __le__(self, val): """Handle the <= operator.""" return self._values <= np.array(val)
[docs] def __gt__(self, val): """Handle the > operator.""" return self._values > np.array(val)
[docs] def __ge__(self, val): """Handle the >= operator.""" return self._values >= np.array(val)
[docs] def __eq__(self, val): """Handle the == operator.""" return self._values == np.array(val)
[docs] def __ne__(self, val): """Handle the != operator.""" return self._values != np.array(val)
[docs] def __getitem__(self, arg: any) -> any: """ Get data. This function is called when self[arg]. """ if self._dtype == 'vectorXd' and isinstance(arg, int): # model, par, locs, parameter_on_set=None return ParameterVectorXd( self._model, self._par, [self.objects[arg]], self.parameter_on_set, ) if isinstance(arg, slice) or isinstance(arg, int): return self._values[arg] if isinstance(arg, str): if np.sum(self.objects_short == arg) == 1: return self._values[self.objects_short == arg][0] if np.any(self.objects_short == arg): # too many raise ValueError('Something went wrong in building the model.') raise ValueError('Object not found. Check the spelling.') if isinstance(arg, list) and isinstance(arg[0], str): if np.any(np.min( self.objects_short != np.array(arg).reshape((-1, 1)), axis=1)): raise ValueError('Not all ojects are found.') return self._values[ np.argmax(self.objects_short == np.array(arg).reshape((-1, 1)), axis=1) ] if (isinstance(arg, list) and isinstance(arg[0], int)): return self._values[arg] if (isinstance(arg, np.ndarray)): return self._values[arg] raise ValueError('Unknown key ', arg)
[docs] class Parameter(ParSig): """ General functions to retreive signals from the c++ object. Parameters ---------- model: ctypes object C++ model par: str Parameter name that will be obtained from each loc. locs: string Locations of objects in the c++ model that can be altered along this component. """ def __init__(self, model, par, locs, parameter_on_set=None): self.parameter_on_set = parameter_on_set self.shape = len(locs), super().__init__(model, par, locs) def _get(self, arg=slice(None, None, None)): """ Get data. This function is called when self[arg]. Parameters ---------- arg: slice or string If slice, return a components object with locs obtained from slice. If str, return the parameter or signal for all locs """ if isinstance(arg, slice) and self._dtype == 'vectorXd': return [self._model.get(loc+'.'+self._par) for loc in self.objects[arg]] if isinstance(arg, slice): return np.array([self._model.get(loc+'.'+self._par) for loc in self.objects[arg]]) return self._model.get(self.objects[arg]+'.'+self._par)
[docs] def __len__(self): """Handle len(self).""" return self.shape[0]
[docs] def __setitem__(self, arg, value): """ Set data. Set data of the parameter for locs given in arg. Only parameter names can be used. Parameters ---------- arg: slice Locations to be changed. val: float/int/bool Value will automatically be translated to type of the parameter. """ if self._model_i_run != self._model._count_run_commands: raise ReferenceError('Parameters stored seperatly can not be ' 'changed.') if not hasattr(value, '__len__') and isinstance(arg, slice): value = np.ones(len(self.objects[arg]))*value # check for length if isinstance(arg, slice) and (len(self.objects[arg]) != len(value)): raise ValueError('Dimensions do not match.') if self._dtype == 'vectorXd': return self.__getitem__(arg).__setitem__(0, value) if isinstance(arg, slice): for i_loc, loc in enumerate(self.objects[arg]): self._model.set(loc+'.'+self._par, value[i_loc]) elif isinstance(arg, list) and isinstance(arg[0], str): value = np.ones(len(arg))*value for i_loc, loc in enumerate(arg): fullloc = self.objects[ np.argwhere(self.objects_short == loc)[0, 0]] self._model.set(fullloc+'.'+self._par, value[i_loc]) elif isinstance(arg, np.ndarray) and arg.dtype == 'bool': # if value is scalar, map to same length # if value is array, it needs to have same length as arg set_values = np.ones(np.sum(arg)) * value set_idx = np.argwhere(arg).reshape(-1) for i, j in enumerate(set_idx): fullloc = self.objects[j] self._model.set(fullloc+'.'+self._par, set_values[i]) elif isinstance(arg, str): if np.sum(self.objects_short == arg) == 1: fullloc = self.objects[np.argwhere( self.objects_short == arg)[0, 0]] self._model.set(fullloc+'.'+self._par, value) elif np.any(self.objects_short == arg): # too many raise ValueError('Something went wrong in building the model.') else: raise ValueError('Object not found. Check the spelling.') elif hasattr(arg, '__len__') and hasattr(value, '__len__'): for a, v in zip(arg, value): self.__setitem__(a, v) elif hasattr(arg, '__len__'): for a in arg: self.__setitem__(a, value) else: self._model.set(self.objects[arg]+'.'+self._par, value) if self.parameter_on_set is not None: self.parameter_on_set(self._model)
[docs] def __getitem__(self, *arg): return super().__getitem__(*arg)
[docs] class ParameterVectorXd(Parameter): def __init__(self, *arg, **kwarg): super().__init__(*arg, **kwarg) self._values = self._values[0]
[docs] def __setitem__(self, arg, value): value *= np.ones(len(self._values)) return self._model.set(self.objects[arg]+'.'+self._par, value)
[docs] def __getitem__(self, *arg): # return super().__getitem__(*arg) return self._values[arg]
[docs] def __len__(self): return len(self._values)
[docs] class Signal(ParSig): """ General functions to retreive signals from the c++ object. Parameters ---------- model: ctypes object C++ model par: str Parameter name that will be obtained from each loc. locs: string Locations of objects in the c++ model that can be altered along this component """ def __init__(self, model, par, locs): super().__init__(model, par, locs) val0 = self._model.get(locs[0]+'.' + par) # shape in line with numpy arrays if hasattr(val0, '__len__'): self.shape = len(val0), len(locs) else: self.shape = len(locs), # ndim in line with numpy arrays self.ndim = len(self.shape)
[docs] def __len__(self): """Handle len(self).""" return self.shape[-1]
def _get(self, arg=slice(None, None, None)): """Get the signal data for all locs.""" if self._dtype == "vector_vectorXd": ret = [ self._model.get(loc+'.'+self._par) for loc in self.objects[arg] ] return ret ret = np.concatenate(( [[self._model.get(loc+'.'+self._par)] for loc in self.objects] ), axis=0)[arg].T return ret
[docs] def __setitem__(self, arg: any, value) -> any: """ Set data. Function is called when use self[arg]. Raises ------ Always raise error, signals can not be changed. """ raise ValueError('Can not change signals')
[docs] def __getitem__(self, arg: any) -> any: """ Get data. This function is called when self[arg]. Parameters ---------- arg: slice or string If slice, return a components object with locs obtained from slice. If str, return the parameter or signal for all locs """ if (isinstance(arg, tuple) and isinstance(arg[1], list) and isinstance(arg[1][0], str)): idx_objects = np.argmax( self.objects_short == np.array(arg[1]).reshape((-1, 1)), axis=1, ) return self._values[:, idx_objects][arg[0], :] if isinstance(arg, tuple) and isinstance(arg[1], str): idx_obj = self.objects_short == arg[1] if not np.any(idx_obj): raise ValueError('Object not found.') value = self._values[arg[0], idx_obj] if np.issubdtype(type(arg[0]), np.integer): return value[0] return value.reshape(-1) if isinstance(arg, tuple) and self._dtype == 'vector_vectorXd': arg2 = arg[2] if len(arg)>2 else slice(None) return self._values[arg[1]][arg[0], arg2] if isinstance(arg, tuple): return self._values[arg] if (isinstance(arg, slice) or isinstance(arg, int)): return self._values[arg] return super().__getitem__(arg)
[docs] def store(self, _export: Union[bool, np.ndarray]): """ Configure storage for export signals for all objects in this container. This method sets whether each object in the container should have its corresponding export signal stored in the model. The input `_export` determines which objects are marked for storage. Parameters ---------- _export : bool or numpy.ndarray of bool - If a single bool is provided, the same value applies to all objects. - If a numpy array of booleans is provided, its length must match the number of objects in this container. Each element indicates whether to store the export signal for the corresponding object. Raises ------ ValueError If `_export` is a numpy array with a length that does not match the number of objects, or if the signal type is not supported. TypeError If `_export` is neither a bool nor a numpy array of booleans. """ if self._dtype == 'vector_vectorXd': n_objects = len(self.objects) _export *= np.ones(n_objects, dtype=bool) else: raise ValueError('Signal type not supported for export: ' + self._dtype) # Set storage true or false for each object for obj, _store in zip(self.objects, _export): self._model.set(f'{obj}.{self._par}', _store)
[docs] class General(Component): """Component object with no locations.""" parameters = { 't_cycle': 'Model.t_cycle', 'version_library': 'Version', 'version_pyCircAdapt': lambda: __version__, } signals = { } def __init__(self, model): self._model = model
[docs] def __repr__(self): """Object representation in string format.""" return '<' + str(self.__class__.__name__) + '> \n' + \ 'parameters: \n ' + self.parameters.keys().__repr__() + \ '\n\n signals: \n ' + self.signals.keys().__repr__() + \ '\n<\\' + str(self.__class__.__name__) + '>'
[docs] def __getitem__(self, arg: any) -> any: """ Get data. This function is called when self[arg]. Parameters ---------- arg: slice or string If slice, return a components object with locs obtained from slice. If str, return the parameter or signal for all locs """ if arg in self.parameters and isinstance(self.parameters[arg], str): return self._model.get(self.parameters[arg]) if arg in self.parameters and callable(self.parameters[arg]): return self.parameters[arg]() if arg in self.signals: return self._model.get(self.signals[arg]) raise ValueError('unknown')
[docs] def __setitem__(self, arg, value): """ Set data. Set data of the parameter for locs given in arg. Only parameter names can be used. Parameters ---------- arg: slice Locations to be changed. val: float/int/bool Value will automatically be translated to type of the parameter. """ if arg in self.parameters: return self._model.set(self.parameters[arg], value) if arg in self.signals: raise ValueError('Signals can not be altered') raise ValueError('Argument not known')
[docs] def __iter__(self): """Iterate over object, used for dict(self).""" for key in self.parameters | self.signals: value = self.__getitem__(key) yield key, value
[docs] class AutoComponent(Component): """ Any component that automatically detects the parameters from the c++ class. This type of component is especially for plugins. No special functions are given to this type of component. """ def __init__(self, component_type, model, objects=None): super().__init__(component_type, model, objects) self._is_initialized = False
[docs] def add_object(self, *arg, **kwarg): super().add_object(*arg, **kwarg) if not self._is_initialized: self.initialize()
[docs] def initialize(self): self._is_initialized = True self.parameters = [] self._detect_parameters_type('parameters_double') self._detect_parameters_type('parameters_int') self._detect_parameters_type('parameters_bool') self._detect_parameters_type('registered_components') self._detect_parameters_type('parameters_vectorXd') self.signals = [] self._detect_signals_type('export_double_vectors') self._detect_signals_type('vector_vectorXd')
def _detect_parameters_type(self, par_type='parameters_double'): n_parameters = self._model._get_int(f'{self.objects[0]}.n_{par_type}') self.parameters += [ self._model._get_str(f'{self.objects[0]}.{par_type}:{i}') for i in range(n_parameters)] def _detect_signals_type(self, par_type='export_double_vectors'): n_signals = self._model._get_int(f'{self.objects[0]}.n_{par_type}') self.signals += [ self._model._get_str(f'{self.objects[0]}.{par_type}:{i}') for i in range(n_signals)]