"""
Components for easy input/output of the CircAdapt model.
The component is a general class. It holds parameters that will be called
using the Parameter object and signals that are called using the Signals
object.
"""
import numpy as np
from circadapt.settings import __version__
from typing import Union
[docs]
class Component:
"""
General functions to communicate with c++ objects.
Parameters
----------
model: ctypes object
C++ model
locs: string
Locations of objects in the c++ model that can be altered along this
component
"""
parameters = []
signals = []
parameter_on_set = {}
def __init__(self, component_type, model, objects=None):
self._component_type = component_type
self._model = model
self.objects = [] if objects is None else objects
self.build()
[docs]
def build(self):
self.objects_short = np.array([
o.split('.')[-1] for o in self.objects
])
[docs]
def add_object(self, o):
self.objects.append(o)
self.build()
[docs]
def __repr__(self):
"""Object representation in string format."""
return '<' + str(self.__class__.__name__) + '> \n' + \
'parameters: \n ' + self.parameters.__repr__() + \
'\n\n signals: \n ' + self.signals.__repr__() + \
'\n\n objects: \n ' + self.objects.__repr__() + \
'\n<\\' + str(self.__class__.__name__) + '>'
[docs]
def __getitem__(self, arg: any) -> any:
"""
Get data.
This function is called when self[arg].
Parameters
----------
arg: slice or string
If slice, return a components object with locs obtained from slice.
If str, return the parameter or signal for all locs
"""
if isinstance(arg, slice) or \
isinstance(arg, int):
return self.__class__(
self._model,
objects=self.objects[arg],
)
if arg in self.signals:
return Signal(self._model, arg, self.objects)
if arg in self.parameters:
return Parameter(
self._model,
arg,
self.objects,
self.parameter_on_set.get(arg, None),
)
raise ValueError('Argument unknown')
[docs]
def __setitem__(self, arg: str, val: any) -> any:
"""
Set data.
Set data of the parameter given in arg. Only parameter names can be
used.
Parameters
----------
arg: str
Parameter name
val: float/int/bool
Value will automatically be translated to type to
"""
if arg in self.parameters:
par = Parameter(self._model, arg, self.objects)
par[:] = val
# trigger function
if arg in self.parameter_on_set:
self.parameter_on_set[arg](self._model)
return
if arg in self.signals:
raise ValueError('Can not set signal')
raise ValueError(f'Parameter "{arg}" unknown')
[docs]
def __iter__(self):
"""Iterate over object, used for dict(self)."""
yield 'objects', self.objects
for key in self.parameters + self.signals:
value = self.__getitem__(key)._get()
yield key, value
[docs]
def add(self, name):
split_name = name.split('.')
return self._model.add_component(self._component_type, split_name[-1], name[:-len(split_name[-1])-1])
[docs]
class ParSig:
"""Basic functions used for Parameter and Signal classes."""
def __init__(self, model, par, locs):
self._model = model
self._par = par
self.objects = locs
self._dtype = self._find_dtype()
# Store number of run commmands of circadapt model.
# If is changed, values do not belong to model anymore and set function
# must be disabled.
self._model_i_run = model._count_run_commands
self.build()
def _find_dtype(self):
return self._model._find_dtype(self.objects[0]+'.'+self._par)
[docs]
def build(self):
self.objects_short = np.array([
o.split('.')[-1] for o in self.objects
])
self._values = self._get()
[docs]
def __repr__(self):
"""Object representation in string format."""
return self._values.__repr__()
[docs]
def __add__(self, val):
"""Handle the + operator."""
if isinstance(val, Signal):
val = val[:]
return self._values + val
[docs]
def __radd__(self, val):
"""Handle the + operator as right hand element."""
return val + self._values
[docs]
def __sub__(self, val):
"""Handle the - operator."""
if isinstance(val, Signal):
val = val[:]
return self._values - val
[docs]
def __rsub__(self, val):
"""Handle the - operator as right hand element."""
return val - self._values
[docs]
def __mul__(self, val):
"""Handle the * operator."""
if isinstance(val, Signal):
val = val[:]
return self._values * val
[docs]
def __rmul__(self, val):
"""Handle the * operator as right hand element."""
return val * self._values
[docs]
def __truediv__(self, val):
"""Handle the / operator."""
return self._values / val
[docs]
def __rtruediv__(self, val):
"""Handle the / operator as right hand element."""
return val / self._values
[docs]
def __floordiv__(self, val):
"""Handle the // operator."""
return self._values // val
[docs]
def __rfloordiv__(self, val):
"""Handle the // operator."""
return val // self._values
[docs]
def __mod__(self, val):
"""Handle the % operator."""
return self._values % val
[docs]
def __rmod__(self, val):
"""Handle the % operator."""
return val % self._values
[docs]
def __pow__(self, val):
"""Handle the ** operator."""
return self._values ** val
[docs]
def __rpow__(self, val):
"""Handle the ** operator as right hand element."""
return val ** self._values
[docs]
def __neg__(self):
"""Handle -self."""
return -self._values
[docs]
def __lt__(self, val):
"""Handle the < operator."""
return self._values < np.array(val)
[docs]
def __le__(self, val):
"""Handle the <= operator."""
return self._values <= np.array(val)
[docs]
def __gt__(self, val):
"""Handle the > operator."""
return self._values > np.array(val)
[docs]
def __ge__(self, val):
"""Handle the >= operator."""
return self._values >= np.array(val)
[docs]
def __eq__(self, val):
"""Handle the == operator."""
return self._values == np.array(val)
[docs]
def __ne__(self, val):
"""Handle the != operator."""
return self._values != np.array(val)
[docs]
def __getitem__(self, arg: any) -> any:
"""
Get data.
This function is called when self[arg].
"""
if self._dtype == 'vectorXd' and isinstance(arg, int):
# model, par, locs, parameter_on_set=None
return ParameterVectorXd(
self._model,
self._par,
[self.objects[arg]],
self.parameter_on_set,
)
if isinstance(arg, slice) or isinstance(arg, int):
return self._values[arg]
if isinstance(arg, str):
if np.sum(self.objects_short == arg) == 1:
return self._values[self.objects_short == arg][0]
if np.any(self.objects_short == arg): # too many
raise ValueError('Something went wrong in building the model.')
raise ValueError('Object not found. Check the spelling.')
if isinstance(arg, list) and isinstance(arg[0], str):
if np.any(np.min(
self.objects_short != np.array(arg).reshape((-1, 1)),
axis=1)):
raise ValueError('Not all ojects are found.')
return self._values[
np.argmax(self.objects_short == np.array(arg).reshape((-1, 1)),
axis=1)
]
if (isinstance(arg, list) and isinstance(arg[0], int)):
return self._values[arg]
if (isinstance(arg, np.ndarray)):
return self._values[arg]
raise ValueError('Unknown key ', arg)
[docs]
class Parameter(ParSig):
"""
General functions to retreive signals from the c++ object.
Parameters
----------
model: ctypes object
C++ model
par: str
Parameter name that will be obtained from each loc.
locs: string
Locations of objects in the c++ model that can be altered along this
component.
"""
def __init__(self, model, par, locs, parameter_on_set=None):
self.parameter_on_set = parameter_on_set
self.shape = len(locs),
super().__init__(model, par, locs)
def _get(self, arg=slice(None, None, None)):
"""
Get data.
This function is called when self[arg].
Parameters
----------
arg: slice or string
If slice, return a components object with locs obtained from slice.
If str, return the parameter or signal for all locs
"""
if isinstance(arg, slice) and self._dtype == 'vectorXd':
return [self._model.get(loc+'.'+self._par)
for loc in self.objects[arg]]
if isinstance(arg, slice):
return np.array([self._model.get(loc+'.'+self._par)
for loc in self.objects[arg]])
return self._model.get(self.objects[arg]+'.'+self._par)
[docs]
def __len__(self):
"""Handle len(self)."""
return self.shape[0]
[docs]
def __setitem__(self, arg, value):
"""
Set data.
Set data of the parameter for locs given in arg. Only parameter names
can be used.
Parameters
----------
arg: slice
Locations to be changed.
val: float/int/bool
Value will automatically be translated to type of the parameter.
"""
if self._model_i_run != self._model._count_run_commands:
raise ReferenceError('Parameters stored seperatly can not be '
'changed.')
if not hasattr(value, '__len__') and isinstance(arg, slice):
value = np.ones(len(self.objects[arg]))*value
# check for length
if isinstance(arg, slice) and (len(self.objects[arg]) != len(value)):
raise ValueError('Dimensions do not match.')
if self._dtype == 'vectorXd':
return self.__getitem__(arg).__setitem__(0, value)
if isinstance(arg, slice):
for i_loc, loc in enumerate(self.objects[arg]):
self._model.set(loc+'.'+self._par, value[i_loc])
elif isinstance(arg, list) and isinstance(arg[0], str):
value = np.ones(len(arg))*value
for i_loc, loc in enumerate(arg):
fullloc = self.objects[
np.argwhere(self.objects_short == loc)[0, 0]]
self._model.set(fullloc+'.'+self._par, value[i_loc])
elif isinstance(arg, np.ndarray) and arg.dtype == 'bool':
# if value is scalar, map to same length
# if value is array, it needs to have same length as arg
set_values = np.ones(np.sum(arg)) * value
set_idx = np.argwhere(arg).reshape(-1)
for i, j in enumerate(set_idx):
fullloc = self.objects[j]
self._model.set(fullloc+'.'+self._par, set_values[i])
elif isinstance(arg, str):
if np.sum(self.objects_short == arg) == 1:
fullloc = self.objects[np.argwhere(
self.objects_short == arg)[0, 0]]
self._model.set(fullloc+'.'+self._par, value)
elif np.any(self.objects_short == arg): # too many
raise ValueError('Something went wrong in building the model.')
else:
raise ValueError('Object not found. Check the spelling.')
elif hasattr(arg, '__len__') and hasattr(value, '__len__'):
for a, v in zip(arg, value):
self.__setitem__(a, v)
elif hasattr(arg, '__len__'):
for a in arg:
self.__setitem__(a, value)
else:
self._model.set(self.objects[arg]+'.'+self._par, value)
if self.parameter_on_set is not None:
self.parameter_on_set(self._model)
[docs]
def __getitem__(self, *arg):
return super().__getitem__(*arg)
[docs]
class ParameterVectorXd(Parameter):
def __init__(self, *arg, **kwarg):
super().__init__(*arg, **kwarg)
self._values = self._values[0]
[docs]
def __setitem__(self, arg, value):
value *= np.ones(len(self._values))
return self._model.set(self.objects[arg]+'.'+self._par, value)
[docs]
def __getitem__(self, *arg):
# return super().__getitem__(*arg)
return self._values[arg]
[docs]
def __len__(self):
return len(self._values)
[docs]
class Signal(ParSig):
"""
General functions to retreive signals from the c++ object.
Parameters
----------
model: ctypes object
C++ model
par: str
Parameter name that will be obtained from each loc.
locs: string
Locations of objects in the c++ model that can be altered along this
component
"""
def __init__(self, model, par, locs):
super().__init__(model, par, locs)
val0 = self._model.get(locs[0]+'.' + par)
# shape in line with numpy arrays
if hasattr(val0, '__len__'):
self.shape = len(val0), len(locs)
else:
self.shape = len(locs),
# ndim in line with numpy arrays
self.ndim = len(self.shape)
[docs]
def __len__(self):
"""Handle len(self)."""
return self.shape[-1]
def _get(self, arg=slice(None, None, None)):
"""Get the signal data for all locs."""
if self._dtype == "vector_vectorXd":
ret = [
self._model.get(loc+'.'+self._par)
for loc in self.objects[arg]
]
return ret
ret = np.concatenate((
[[self._model.get(loc+'.'+self._par)]
for loc in self.objects]
), axis=0)[arg].T
return ret
[docs]
def __setitem__(self, arg: any, value) -> any:
"""
Set data.
Function is called when use self[arg].
Raises
------
Always raise error, signals can not be changed.
"""
raise ValueError('Can not change signals')
[docs]
def __getitem__(self, arg: any) -> any:
"""
Get data.
This function is called when self[arg].
Parameters
----------
arg: slice or string
If slice, return a components object with locs obtained from slice.
If str, return the parameter or signal for all locs
"""
if (isinstance(arg, tuple) and
isinstance(arg[1], list) and
isinstance(arg[1][0], str)):
idx_objects = np.argmax(
self.objects_short == np.array(arg[1]).reshape((-1, 1)),
axis=1,
)
return self._values[:, idx_objects][arg[0], :]
if isinstance(arg, tuple) and isinstance(arg[1], str):
idx_obj = self.objects_short == arg[1]
if not np.any(idx_obj):
raise ValueError('Object not found.')
value = self._values[arg[0], idx_obj]
if np.issubdtype(type(arg[0]), np.integer):
return value[0]
return value.reshape(-1)
if isinstance(arg, tuple) and self._dtype == 'vector_vectorXd':
arg2 = arg[2] if len(arg)>2 else slice(None)
return self._values[arg[1]][arg[0], arg2]
if isinstance(arg, tuple):
return self._values[arg]
if (isinstance(arg, slice) or
isinstance(arg, int)):
return self._values[arg]
return super().__getitem__(arg)
[docs]
def store(self, _export: Union[bool, np.ndarray]):
"""
Configure storage for export signals for all objects in this container.
This method sets whether each object in the container should have its
corresponding export signal stored in the model. The input `_export`
determines which objects are marked for storage.
Parameters
----------
_export : bool or numpy.ndarray of bool
- If a single bool is provided, the same value applies to all objects.
- If a numpy array of booleans is provided, its length must match
the number of objects in this container. Each element indicates
whether to store the export signal for the corresponding object.
Raises
------
ValueError
If `_export` is a numpy array with a length that does not match
the number of objects, or if the signal type is not supported.
TypeError
If `_export` is neither a bool nor a numpy array of booleans.
"""
if self._dtype == 'vector_vectorXd':
n_objects = len(self.objects)
_export *= np.ones(n_objects, dtype=bool)
else:
raise ValueError('Signal type not supported for export: ' + self._dtype)
# Set storage true or false for each object
for obj, _store in zip(self.objects, _export):
self._model.set(f'{obj}.{self._par}', _store)
[docs]
class General(Component):
"""Component object with no locations."""
parameters = {
't_cycle': 'Model.t_cycle',
'version_library': 'Version',
'version_pyCircAdapt': lambda: __version__,
}
signals = {
}
def __init__(self, model):
self._model = model
[docs]
def __repr__(self):
"""Object representation in string format."""
return '<' + str(self.__class__.__name__) + '> \n' + \
'parameters: \n ' + self.parameters.keys().__repr__() + \
'\n\n signals: \n ' + self.signals.keys().__repr__() + \
'\n<\\' + str(self.__class__.__name__) + '>'
[docs]
def __getitem__(self, arg: any) -> any:
"""
Get data.
This function is called when self[arg].
Parameters
----------
arg: slice or string
If slice, return a components object with locs obtained from slice.
If str, return the parameter or signal for all locs
"""
if arg in self.parameters and isinstance(self.parameters[arg], str):
return self._model.get(self.parameters[arg])
if arg in self.parameters and callable(self.parameters[arg]):
return self.parameters[arg]()
if arg in self.signals:
return self._model.get(self.signals[arg])
raise ValueError('unknown')
[docs]
def __setitem__(self, arg, value):
"""
Set data.
Set data of the parameter for locs given in arg. Only parameter names
can be used.
Parameters
----------
arg: slice
Locations to be changed.
val: float/int/bool
Value will automatically be translated to type of the parameter.
"""
if arg in self.parameters:
return self._model.set(self.parameters[arg], value)
if arg in self.signals:
raise ValueError('Signals can not be altered')
raise ValueError('Argument not known')
[docs]
def __iter__(self):
"""Iterate over object, used for dict(self)."""
for key in self.parameters | self.signals:
value = self.__getitem__(key)
yield key, value
[docs]
class AutoComponent(Component):
"""
Any component that automatically detects the parameters from the c++ class.
This type of component is especially for plugins. No special functions are
given to this type of component.
"""
def __init__(self, component_type, model, objects=None):
super().__init__(component_type, model, objects)
self._is_initialized = False
[docs]
def add_object(self, *arg, **kwarg):
super().add_object(*arg, **kwarg)
if not self._is_initialized:
self.initialize()
[docs]
def initialize(self):
self._is_initialized = True
self.parameters = []
self._detect_parameters_type('parameters_double')
self._detect_parameters_type('parameters_int')
self._detect_parameters_type('parameters_bool')
self._detect_parameters_type('registered_components')
self._detect_parameters_type('parameters_vectorXd')
self.signals = []
self._detect_signals_type('export_double_vectors')
self._detect_signals_type('vector_vectorXd')
def _detect_parameters_type(self, par_type='parameters_double'):
n_parameters = self._model._get_int(f'{self.objects[0]}.n_{par_type}')
self.parameters += [
self._model._get_str(f'{self.objects[0]}.{par_type}:{i}')
for i in range(n_parameters)]
def _detect_signals_type(self, par_type='export_double_vectors'):
n_signals = self._model._get_int(f'{self.objects[0]}.n_{par_type}')
self.signals += [
self._model._get_str(f'{self.objects[0]}.{par_type}:{i}')
for i in range(n_signals)]