import os
import sys
from ctypes import cdll, c_double, c_char_p, c_bool, c_int
import ctypes
import scipy.io as sio
import numpy as np
from circadapt.matlab import _check_keys
from circadapt.components import AutoComponent
from circadapt.components.patch import Patch, Patch2022
from circadapt.components.connector import ArtVen, Connector, Diode, \
Resistance, Valve, Valve2022
from circadapt.components.wall import Wall, Wall2022, LoadExperiment
from circadapt.components.cavity import Bag, Cavity, Capacitor, Tube0D, \
Chamber, Chamber2022, TriSeg2022, TriSeg
from circadapt.components.node import Node
from circadapt.components.global_functions import Timings, PressureFlowControl
from circadapt.components import General
from circadapt.components.solver import Solver
from circadapt.error import raise_error_on_non_existance_set, except_OSerror
from circadapt.error import catch_general_error
from circadapt.error import ModelCrashed, TriggerNotFound, CorruptBuild, \
ModelNotStable
from .settings import __version__
from circadapt import get_default_path_to_circadapt
# %% Plugins
_loaded_plugins = []
[docs]
def load_plugin_components(filename: str):
"""
Load a plugin from a library.
These plugins will be loaded every time a new model is loaded.
Parameters
----------
filename : str
path to library.
"""
if not os.path.isfile(filename):
raise FileNotFoundError(
f"Plugin could not be loaded. File '{filename}' does not exist.")
_loaded_plugins.append(filename)
# %% Decorator
[docs]
def check_build_decorator(func):
"""Decorator to check build when it is not checked."""
def inner(self, *arg, **kwarg):
if not self._is_checked:
self.check_build()
return func(self, *arg, **kwarg)
return inner
# %% Class
[docs]
class CircAdapt:
"""
Wrapper to communicate with c++ code.
Parameters
----------
solver: str (optional)
Name of solver to be build. If not given, the default models solver is
used.
model: str
Name of the model to be build.
path_to_circadapt: str (optional)
Path to CircAdapt library. If not given, the default_path_to_circadapt
is used.
model_state: (multi)
Model state to be loaded.
(default) -> load model reference from package
False -> do nothing
str -> load filename
dict -> load given model state
"""
# Init CircAdapt by telling where to find the dll file
_local_save_reference = False
def __init__(self,
solver: str = None,
model: str = 'Custom',
path_to_circadapt: str = None,
model_state: dict = None,
):
# set default path
self.path = os.path.dirname(sys.modules["circadapt"].__file__)
self._model = None
self._idx_model = None
self.version = __version__
# safty measures
self._count_run_commands = 0
self._is_checked = False # true if check_build is triggered
# list of components for easy i/o
self._components = {}
self._componentlocs = {}
self.components = []
self._module_rename = {}
# init solver and general elements
self._components['Solver'] = Solver(self)
self._components['General'] = General(self)
self.components = list(self._components.keys())
if path_to_circadapt is None:
path_to_circadapt = os.path.join(
self.path, get_default_path_to_circadapt())
self._path_to_circadapt = path_to_circadapt
self._update_settings()
model_state = self.get_model_reference(model_state)
self._build(model,
solver,
model_state=model_state,
)
# load model_state into c++ object
if model_state is not None and model_state is not False:
self.model_import(model_state)
def _update_settings(self):
"""Function to overrule settings in derived classes."""
[docs]
def __del__(self):
"""
Destroy object.
This function is triggered when the python wrapper object is deleted
or overwritten. As this wrapper only has pointers to the c++ object,
the destructor of the c++ object will be triggered.
"""
self._destroy_model()
def _destroy_model(self):
if self._model is not None and self._idx_model is not None:
self._model.delete_core(self._idx_model)
self._model = None
######
# Build functions
######
def _build(self,
model,
solver=None,
model_state=None,
):
self._model_name = model
self._solver_name = solver
# get solver from model state
if solver is None:
solver = model_state['Solver']['Type']
# init the c++ object
self._init_c_object()
# build the c++ object
model = model.encode('ASCII')
solver = solver.encode('ASCII')
# build model
self._idx_model = self._model.build(model, solver)
if self._idx_model < 0:
raise Exception("Error while building the model.")
self.build()
self.check_build()
[docs]
def build(self):
"""
Build model.
Add and link components in the model. By default, the model is empty.
This function will be used in derived model builts.
"""
[docs]
def check_build(self):
if not self._get_bool('check_build'):
self.trigger('check_build')
raise CorruptBuild()
self._is_checked = True
[docs]
def get_model_reference(self,
model_state: any = None,
) -> dict:
"""
Return reference model state as a dictionary.
If model_state is string, it is assumed to be a file location. This
file will be loaded and returned. If is none, the default reference
will be loaded. If no default reference available, it will be created
and stored in the current active directory.
Parameters
----------
model_state : str or None, optional
Location of model state. The default is None.
Raises
------
ValueError
DESCRIPTION.
Returns
-------
dict
Dictonary with model state.
"""
if model_state is None:
# try to load reference from package
filename = self._get_reference_filename()
if os.path.isfile(filename):
model_state = filename
if model_state is None:
model_state = self._load_reference_from_folder()
if isinstance(model_state, str) and os.path.isfile(model_state):
return np.load(model_state, allow_pickle=True).item()
if isinstance(model_state, str):
return None
if model_state is None or model_state is False:
return model_state
raise ValueError('Parameter model_state not recognized.')
def _get_reference_filename(self):
return os.path.join(
self.path,
"reference_model_states/ref_" + self.__class__.__name__ + ".npy",
)
def _load_reference_from_folder(self):
filename = "P_ref_" + self.__class__.__name__ + ".npy"
if os.path.isfile(filename):
return filename
elif self._model is not None:
self.set_reference()
# save to folder for quicker loading reference next time
if self._local_save_reference and self._model is not None:
model_state = self.model_export(style='General')
np.save(filename, model_state, allow_pickle=True)
return filename
[docs]
def load_reference(self):
"""Load reference of current model from the package."""
model_state = self.get_model_reference()
self.model_import(model_state)
######
# General initalization
######
def _init_c_object(self):
self._model = cdll.LoadLibrary(self._path_to_circadapt)
for plugin in _loaded_plugins:
self.load_plugin_components(plugin)
# Build
self._model.build.argtypes = [c_char_p, c_char_p]
self._model.build.restype = c_int
# Get and set
self._model.get_double.restype = c_bool
self._model.get_double.argtypes = [
c_int,
c_char_p,
ctypes.POINTER(ctypes.c_double),
]
self._model.set_double.restype = c_bool
self._model.set_double.argtypes = [c_int, c_char_p, c_double]
self._model.get_double_vector.restype = c_bool
self._model.get_double_vector.argtypes = [c_int, c_char_p]
self._model.get_double_vector_length.restype = c_int
self._model.get_double_vector_length.argtypes = [c_int, c_char_p]
self._model.set_double_vector.restype = c_bool
# self._model.set_double_vector.argtypes = [c_int, c_char_p, c_double, c_int]
self._model.load_string_to_buffer.restype = c_int
self._model.load_string_to_buffer.argtypes = [c_int, c_char_p]
self._model.get_string_from_buffer.restype = c_bool
self._model.get_string_from_buffer.argtypes = [c_int, c_char_p]
self._model.get_bool.restype = c_bool
self._model.get_bool.argtypes = [
c_int,
c_char_p,
ctypes.POINTER(ctypes.c_bool),
]
self._model.set_bool.restype = c_bool
self._model.set_bool.argtypes = [c_int, c_char_p, c_bool]
self._model.get_int.restype = c_bool
self._model.get_int.argtypes = [
c_int,
c_char_p,
ctypes.POINTER(ctypes.c_int),
]
self._model.set_int.restype = c_bool
self._model.set_int.argtypes = [c_int, c_char_p, c_int]
self._model.trigger.restype = c_bool
self._model.trigger.argtypes = [c_int, c_char_p]
self._model.get_version.restype = c_char_p
self._model.run_n_beats.argtypes = [c_int, c_int, c_bool]
# Add components to empty model and set Component to link components
self._model.add_component.restype = c_bool
self._model.add_component.argtypes = [c_int, c_char_p, c_char_p]
self._model.set_component.restype = c_bool
self._model.set_component.argtypes = [c_int, c_char_p, c_char_p]
self._model.load_plugin_component.restype = c_bool
self._model.load_plugin_component.argtypes = [c_char_p]
self._model.component_exists.restype = c_bool
self._model.component_exists.argtypes = [c_char_p]
[docs]
def load_plugin_components(self, path_to_library: str):
"""
Load a plugin
Parameters
----------
path_to_library : str
Path to library.
"""
_path = path_to_library.encode('ASCII')
if self._model.load_plugin_component(_path):
return True
raise ValueError('Could not import plugin file "' + path_to_library +
'"')
######
# Functions to run beats in CircAdapt
######
@except_OSerror
@check_build_decorator
def run(self,
n_beats: int = 1,
stable: bool = False,
export: bool = True) -> None:
"""
Run the model by solving the ODEs for n_beats.
The currently loaded model, model state, and parameterization will be
used to run n_beats. Optional, more beats are performed until the model
is hemodynamically stable (stable=True). By default, all data will
be exported (export=True). By disabling this (export=False), the model
runs faster by only solving the state variables in case a backward ODE
solver is used.
Parameters
----------
n_beats: int, optional (default: True)
Number of beats
stable: bool, optional (default: False)
Run beats until simulation is hemodynamically stable
export: bool, optional (default: False)
Future
"""
self._count_run_commands += 1
if stable:
self._model.run_stable(self._idx_model, export)
else:
self._model.run_n_beats(self._idx_model, n_beats, export)
if self.get('Model.is_crashed'):
raise ModelCrashed()
if stable and not self.is_stable():
raise ModelNotStable()
[docs]
def is_stable(self) -> bool:
"""Check if simulation is hemodynamically stable."""
return self.get('Model.is_stable', dtype=bool)
[docs]
@check_build_decorator
def get(self,
par: str,
dtype: any = None,
) -> any:
"""
Get parameter from the model.
Parameters
----------
par: str
Parameter name to get. This string value contains the parameter
that will be obtained and the component from which it will be
obtained. Components, its subcomponents, and the parameter name
at the and are separated with a dot.
dtype: string or type, optional (default = None)
The type of parameter that will be obtained. If not specified,
it will be detairmined automatically. If computational cost is
important, it is better to specify the dtype.
Returns
-------
Get value: type depending on dtype.
Returns the value stored in the c++ object
"""
if dtype is None:
dtype = self._find_dtype(par)
if dtype in ['double', float, np.float64]:
return self._get_double(par)
if dtype in [list, "double_vector"]:
return self._get_double_vector(par)
if dtype in ["bool", bool]:
return self._get_bool(par)
if dtype in ["int", int]:
return self._get_int(par)
if dtype in ['str', 'none'] or dtype == str:
return self._get_str(par)
if dtype in ['component']:
return self._get_str(par)
# state_variables are not always initialized as vectors.
if dtype == 'state_variable':
try:
return self._get_double_vector(par)
except ValueError:
return self._get_double(par)
raise ValueError(f'Parameter {par} type not identified. ')
def _find_dtype(self, full_pararameter_path: str) -> str:
reverse_split = full_pararameter_path[::-1].split('.', 1)
reverse_par, reverse_base = reverse_split if len(reverse_split) == 2 \
else (reverse_split[0], '')
if len(reverse_base) == 0:
return self._get_str('parameter_type:'+reverse_par[::-1])
return self._get_str(reverse_base[::-1]+'.parameter_type:'+reverse_par[::-1])
[docs]
@check_build_decorator
def set(self, par: str, val: any, dtype=None) -> bool:
"""
Set parameter from dtype with value val.
Parameters
----------
par: str
Parameter name to get. This string value contains the parameter
that will be obtained and the component from which it will be
obtained. Components, its subcomponents, and the parameter name
at the and are separated with a dot.
val: any
Value to be set. The value must be the same as the parameter
used.
dtype: string or type, optional (default = None)
The type of parameter that will be obtained. If not specified,
it will be detairmined automatically. If computational cost is
important, it is better to specify the dtype.
Returns
-------
bool: success or not
"""
if dtype is None:
dtype = self._find_dtype(par)
if (dtype in ['double', 'state_variable', float, np.float64] or
(dtype == 'double_vector' and isinstance(val, float)) or
(dtype == 'double_vector' and isinstance(val, int))):
return self._set_double(par, val)
if dtype == 'double_vector':
return self._set_double_vector(par, val)
if dtype == 'component':
return self.set_component(par, val)
if dtype in ['bool', bool]:
return self._set_bool(par, val)
if dtype in ['int', int]:
return self._set_int(par, val)
raise ValueError('Parameter ' + par + ' not found.')
[docs]
def trigger(self, par) -> bool:
"""
Trigger a function.
Objects in the model may have a function that can be triggered. This
can be done by triggering the function using the 'par' parameter
similar to the set function, only without a parameter.
Parameters
----------
par : str
Function in object that will be triggered. It has a similar form
to the set/get par, i.e. 'Component.subcomponent.function_name'
Returns
-------
is_success (bool)
"""
_par = par.encode('ASCII')
is_success = self._model.trigger(self._idx_model, _par)
if not is_success:
raise TriggerNotFound(par)
return True
@raise_error_on_non_existance_set
@catch_general_error
def _set_bool(self, par, val):
_par = par.encode('ASCII')
is_success = self._model.set_bool(self._idx_model, _par, bool(val))
return is_success
def _get_bool(self, par):
_par = par.encode('ASCII')
result = ctypes.c_bool()
result_ptr = ctypes.pointer(result)
is_success = self._model.get_bool(self._idx_model, _par, result_ptr)
# Raise error if bool parameter does not exist
if not is_success:
raise NameError('Parameter "'+par+'" could not be found.')
return result.value
@raise_error_on_non_existance_set
def _set_int(self, par, val):
_par = par.encode('ASCII')
is_success = self._model.set_int(self._idx_model, _par, int(val))
return is_success
def _get_int(self, par):
_par = par.encode('ASCII')
result = ctypes.c_int()
result_ptr = ctypes.pointer(result)
is_success = self._model.get_int(self._idx_model, _par, result_ptr)
# Raise error if int parameter does not exist
if not is_success:
raise NameError('Integer parameter "'+par+'" could not be found.')
return result.value
def _get_double(self, par):
"""Get a double value from the model."""
_par = par.encode('ASCII')
result = ctypes.c_double()
result_ptr = ctypes.pointer(result)
success = self._model.get_double(self._idx_model, _par, result_ptr)
if not success:
raise NameError('Double parameter "'+par+'" could not be found.')
return result.value
@raise_error_on_non_existance_set
def _set_double(self, par, val):
"""Set a parameter par to double value val."""
_par = par.encode('ASCII')
is_success = self._model.set_double(
self._idx_model, _par, c_double(float(val)))
return is_success
def _get_str(self, _par):
"""Get a char value from the model."""
par = _par.encode('ASCII')
buffer_size = self._model.load_string_to_buffer(self._idx_model, par)
if buffer_size < 0:
raise ValueError('Parameter "' + _par + '" not found. ')
output_buffer = ctypes.create_string_buffer(buffer_size)
is_success = self._model.get_string_from_buffer(buffer_size,
output_buffer)
value = str(output_buffer, 'utf-8')
return value
def _get_double_vector(self, par):
"""Get Double Vector."""
# Get vector get_vector_length
# length_vector = self._model.get_vector_length(self._idx_model)
_par = par.encode('ASCII')
length_vector = self._model.get_double_vector_length(
self._idx_model, _par)
if length_vector < 0:
raise ValueError(f'{par} not found.')
# init empty data array
data = (c_double * length_vector)() # equivalent to C++ double[vecLen]
# Fill data
is_success = self._model.get_double_vector(self._idx_model, _par, data)
# raise data if vector does not exist
if not is_success:
raise NameError('Parameter "'+par+'" could not be found.')
return np.array(data)
def _set_double_vector(self, par, value):
"""Set Double Vector."""
# Get vector get_vector_length
# length_vector = self._model.get_vector_length(self._idx_model)
_par = par.encode('ASCII')
length_vector = len(value)
if length_vector < 0:
raise ValueError(f'{par} not found.')
# init empty data array
data = (c_double * length_vector)() # equivalent to C++ double[vecLen]
for i in range(length_vector):
data[i] = value[i]
# Fill data
is_success = self._model.set_double_vector(
self._idx_model, _par, data, length_vector)
# raise data if vector does not exist
if not is_success:
raise NameError('Parameter "'+par+'" could not be found.')
return True
[docs]
def add(self, comp_type: str) -> bool:
return self._add_component_to_wrapper(comp_type)
[docs]
def add_component(self,
comp_type: str,
comp_name: str,
base: str = '') -> bool:
"""
Add component to CircAdapt object.
Parameters
----------
comp_type: str
Type of object to create in the ComponentFactory
comp_name: str
Name of the new object to create
base: char, optional (default='')
Parent object of new component
Returns
-------
is_success (bool)
"""
self._is_checked = False # check model before run
_base = base.encode('ASCII')
_comp_type = comp_type.encode('ASCII')
_comp_name = comp_name.encode('ASCII')
if self._model.add_component(self._idx_model,
_base, _comp_type, _comp_name):
return self._add_component_to_wrapper(comp_type, base+'.'+comp_name)
# Component not loaded, raise error
if not self._model.component_exists(_comp_type):
raise Exception(f'The CircAdapt model does not have a component '
f'named {comp_type}. Check the spelling of the '
'component type or make sure the correct plugin '
'is loaded.')
raise Exception('Component not added for unknown reason.')
def _add_component_to_wrapper(self, comp_type, cpp_location=None):
# add Model. to cpp location
if cpp_location is None:
s = None
else:
s = 'Model.'
s = (s[:-1] if cpp_location[0]=='.' else s) + cpp_location
return self._check_component_in_export_list_and_add_object(
comp_type,
s,
)
# print('TODO: add ', cpp_location, ' to ', comp_type)
return False
# map strings to objects. If item is list, then first item is object
# and following are strings of dependend objects that have to be
# initialized
_str2comp = {
'ArtVen': [ArtVen, 'Connector'],
'Bag': [Bag, 'Cavity'],
'Capacitor': [Capacitor, 'Cavity'],
'Cavity': [Cavity, 'Node'],
'Chamber': [Chamber, 'Cavity'],
'Chamber2022': [Chamber2022, 'Cavity'],
'Connector': Connector,
'Diode': [Diode, 'Connector'],
'Node': Node,
'Patch': Patch,
'Patch2022': Patch2022,
'PressureFlowControl': PressureFlowControl,
'LoadExperiment': [LoadExperiment, 'Wall'],
'Resistance': [Resistance, 'Connector'],
'Timings': Timings,
'TriSeg2022': TriSeg2022,
'TriSeg': TriSeg,
'Tube0D': [Tube0D, 'Cavity'],
'Valve': [Valve, 'Connector'],
'Valve2022': [Valve2022, 'Connector'],
'Wall': Wall,
'Wall2022': Wall2022,
}
def _check_component_in_export_list_and_add_object(self,
comp_type,
loc=None,
):
"""Add comp type if is not in list. Then, add object to all relatebles."""
if comp_type not in self._str2comp.keys():
comp_type_to_use = self._module_rename.get(comp_type, comp_type)
return self._create_auto_component(comp_type_to_use, loc)
obj = self._str2comp[comp_type]
if isinstance(obj, list):
for o in obj[1:]:
self._check_component_in_export_list_and_add_object(o, loc)
obj = obj[0]
comp_type_to_use = self._module_rename.get(comp_type, comp_type)
if comp_type_to_use not in self._components:
self._components[comp_type_to_use] = obj(comp_type, self)
if loc is not None:
self._components[comp_type_to_use].add_object(loc)
self.components = list(self._components.keys())
self.components.sort()
return True
def _create_auto_component(self, comp_type, loc=None):
if comp_type not in self._components:
self._components[comp_type] = AutoComponent(comp_type, self)
if loc is not None:
self._components[comp_type].add_object(loc)
self.components = list(self._components.keys())
self.components.sort()
return True
@raise_error_on_non_existance_set
def set_component(self, par, obj) -> bool:
"""
Set component to the parameter of a CircAdapt object.
Parameters
----------
par: str
Parameter of object to link object obj to
obj: str
Object to set
"""
# model structure is changing, so recheck
self._is_checked = False
_par = par.encode('ASCII')
_obj = obj.encode('ASCII')
if not self._model.set_component(self._idx_model, _par, _obj):
raise Exception('Component not set')
return True
############
# Set model_state
############
[docs]
def model_import(self, model_state, check_model_state=False) -> None:
"""
Load model_state into CircAdapt object.
Style and model_state version is automatically recognized.
Parameters
----------
model_state: dict
model_state with data to set
obj: str
Object to set
"""
# automatically recognize style of object model_state
if len(model_state.keys()) == 3 \
and 'model' in model_state.keys() and 'Model' in model_state.keys() \
and 'Solver' in model_state.keys():
style = 'Custom'
else:
style = self._get_default_model_state_style()
# here, style must be Custom or Empty, otherwise, raise error
if style not in ["Custom", "General"]:
raise ValueError(f'Style "{style}" unknown.')
# Object is general style, so load according to general style
if check_model_state:
self._check_model_state(model_state)
# TODO: build model according to object description
self._set_model_state_model_parameters('Model', model_state['Model'])
self._set_model_state_model_parameters('Model', model_state['Model'],
field='state_variables')
# self._set_model_state_model_parameters('Model', model_state['Model'],
# field='export_double_vectors')
self._set_model_state_model_parameters('Solver', model_state['Solver'])
def _check_model_state(self, model_state):
"""
Check if model_state (param) is correct.
Give warnings if versions do not match.
"""
# check version
if self.get('Version', str) < model_state['model']['Version']:
print('----------')
print('- Warning: The stored P-dict origins from an other version'
' than the used model.')
print('- Check the documentation whether this P-dict'
'is compatible.')
# check model type
if self.get('Model.type', str) != model_state['Model']['Type']:
print('----------')
print('- Warning: Model type is not the same. ')
print('- This might result in incomplete results.')
print('- Change the model name when building the model.')
# check model name
if self.get('Model.name', str) != model_state['Model']['Name']:
print('----------')
print('- Warning: Model type is not the same.')
print('- This might result in incomplete results.')
print('- Change the model name when building the model.')
return True
# Set Parameters
def _set_model_state_model_parameters1(self,
level,
components,
field='parameters',
field1='parameters_double',
dtype=float,
):
if field == "export_double_vectors":
dtype = bool
n_parameters = self.get(level + '.n_' + field1, int)
if n_parameters > 0:
# Set parameters
for i in range(n_parameters):
char_name = level + '.' + field1 + ':' + str(i)
char_name = self.get(char_name, str)
if char_name not in components[field]:
print('- Warning: Parameter ', char_name,
' not in P-dict. Reference value is used. ')
else:
if dtype == float:
succes = self._set_double(level+'.'+char_name,
components[field][char_name])
elif dtype == int:
succes = self._set_int(level+'.'+char_name,
components[field][char_name])
elif dtype == bool:
succes = self._set_bool(level+'.'+char_name,
components[field][char_name])
else:
succes = False
if not succes:
print('- Warning: parameter not set: ',
level + '.' + char_name)
def _set_model_state_model_parameters(self,
level,
components,
field='parameters',
):
if field == "parameters":
self._set_model_state_model_parameters1(
level, components, field, field.lower()+"_double", dtype=float)
self._set_model_state_model_parameters1(
level, components, field, field.lower()+"_bool", dtype=bool)
self._set_model_state_model_parameters1(
level, components, field, field.lower()+"_int", dtype=int)
else:
self._set_model_state_model_parameters1(
level, components, field, field.lower())
if field == 'state_variables':
self._set_state_variables(level, components)
# solver does not have subcomponents, so return
if level == 'Solver':
return
# Set subcomponents
#n_subcomponents = int(self.get(level+'.Subcomponents', float))
n_subcomponents = self.get(level+'.n_subcomponents', int)
subcomponents_of_component = np.array(
[sub_component['name']
for sub_component in components['subcomponents']])
if n_subcomponents > 0:
for i in range(n_subcomponents):
char_name = level + '.subcomponents:' + str(i)
char_name = self.get(char_name, str)
# warn if subcomponent is not in the model
sub_component = subcomponents_of_component == char_name
if np.sum(sub_component) == 0:
print('----------')
print('- Warning: Model subcomponent ', char_name,
' not in P-dict. ')
print('- Reference values are used. ')
else:
self._set_model_state_model_parameters(
level + '.'
+ components['subcomponents'][
np.argmax(sub_component)]['name'],
components['subcomponents'][np.argmax(sub_component)],
field=field,
)
def _set_state_variables(self, level, components):
n_state_variables = self.get(level + '.n_state_variables', int)
n_state_variable_vectors = self.get(level + '.n_state_variable_vectors', int)
names = ([self.get(level + '.state_variables:' + str(i), str)
for i in range(n_state_variables)] +
[self.get(level + '.state_variable_vectors:' + str(i), str)
for i in range(n_state_variable_vectors)])
if len(names) > 0:
# Set parameters
for name in names:
if name not in components['state_variables']:
print('- Warning: Parameter ', name,
' not in P-dict. Reference value is used. ')
else:
value = components['state_variables'][name]
dtype = 'double_vector' if hasattr(value, '__len__') else 'double'
succes = self.set(
level + '.' + name,
value,
dtype = dtype
)
if not succes:
print('- Warning: parameter not set: ',
level + '.' + name)
############
# Get model_state
############
def _get_default_model_state_style(self):
"""Get default style of the stored model state based on the name."""
return 'General'
[docs]
def model_export(self, style: str = None) -> dict:
"""
Return the stored model state.
Parameters
----------
style: str (optional)
Style to follow. If not given, the default style from the model
is used.
Returns
-------
dict
"""
if style is None:
style = self._get_default_model_state_style()
# if style is given, apply model_state style
if style not in ["Custom", "General"]:
raise ValueError(f'Style "{style}" unknown.')
model = self._fill_model_state_model('Model')
return {
'model': self._fill_model_state(),
'Model': model,
'Solver': self._fill_model_state_solver(),
}
# model
def _fill_model_state(self):
return {
'Version': self.get('Version', str)
}
# Fill Solver
def _fill_model_state_solver(self):
# Basics
solver_type = self.get('Solver.type', str)
# Parameters
solver_parameters = self._fill_model_state_model_parameters('Solver')
# State variables at last / next-first iteration
# StateVariables = self.fill_model_state_model_parameters(
# 'Solver', field='StateVariables')
# Exported Features
export_vectors = self._fill_model_state_model_double_vectors('Solver')
return {'type': solver_type,
'parameters': solver_parameters,
'export_double_vectors': export_vectors}
# Fill model
def _fill_model_state_model(self, level):
# Basics
level_name = self.get(level+'.name', str)
level_type = self.get(level+'.type', str)
# Subcomponents
subcomponents = self._fill_model_state_model_subcomponents(level)
# Parameters
parameters = self._fill_model_state_model_parameters(level)
# State variables at last / next-first iteration
state_variables \
= self._fill_model_state_model_parameters(level,
field='state_variables',
)
state_variables \
= self._fill_model_state_model_parameters_state_variable_vectors(
level, state_variables)
# Exported Features
export_vectors = self._fill_model_state_model_double_vectors(level)
return {'name': level_name,
'type': level_type,
'subcomponents': subcomponents,
'parameters': parameters,
'state_variables': state_variables,
'export_double_vectors': export_vectors,
}
# Get subcomponents
def _fill_model_state_model_subcomponents(self, level):
n_subcomponents = self.get(level+'.n_subcomponents', int)
if n_subcomponents == 0:
return []
subcomponents = []
for i in range(n_subcomponents):
char_name = level+'.subcomponents:'+str(i)
char_name = self.get(char_name, str)
subcomponents.append(
self._fill_model_state_model(level+'.'+char_name))
return subcomponents
# Get Parameters
def _fill_model_state_model_parameters(self,
level,
field='parameters',
dtype=float,
):
parameters = {}
if field == "parameters":
parameters = parameters | self._fill_model_state_model_parameters(
level, field=field+'_double', dtype=float)
parameters = parameters | self._fill_model_state_model_parameters(
level, field=field+'_bool', dtype=bool)
parameters = parameters | self._fill_model_state_model_parameters(
level, field=field+'_int', dtype=int)
else:
n_parameters = self._get_int(level + '.n_' + field)
if n_parameters == 0:
return parameters
for i in range(n_parameters):
char_name = level+'.' + field + ':' + str(i)
char_name = self.get(char_name, str)
par = level + '.' + char_name
parameters[char_name] = self.get(par, dtype)
return parameters
# Get Parameters
def _fill_model_state_model_parameters_state_variable_vectors(
self, level, state_variables):
field = 'state_variable_vectors'
n_parameters = self.get(level + '.n_' + field, int)
if n_parameters == 0:
return state_variables
for i in range(n_parameters):
char_name = level + '.' + field + ':' + str(i)
par_name = self._get_str(char_name)
vec = self.get(level + '.' + par_name, 'double_vector')
state_variables[par_name] = np.array(vec)
return state_variables
# Get DoubleVectors
def _fill_model_state_model_double_vectors(self, level,
field='export_double_vectors'):
n_parameters = self._get_int(level + '.n_' + field)
if n_parameters == 0:
return []
parameters = {}
# for i in range(n_parameters):
# char_name = level + '.' + field + ':' + str(i)
# char_name = self._get_str(char_name)
# if self._get_bool(level + '.' + char_name):
# parameters[char_name] = self._get_double_vector(level + '.'
# + char_name)
return parameters
[docs]
def save(self, filename: str, ext: str = None) -> None:
"""
Save model to filename with extention.
Parameters
----------
filename: str
Filename to save file to. Filename must have extention .npy or
.mat
ext: str (optional)
Extention of the file. If not given, the last 3 letters of the
filename is used to determine the save method.
"""
if ext is None:
ext = filename[-3:]
if ext == 'npy':
dataset = self.model_export(style='General')
np.save(filename, dataset, allow_pickle=True)
return
if ext == 'mat':
dataset = self.model_export()
sio.savemat(filename, {'P': dataset})
return
raise ValueError('Extenstion not known')
[docs]
def load(self, filename: str) -> None:
"""
Load a model dataset from a filename.
Parameters
----------
filename: str
Path to file that will be loaded. The extension must be .npy or
.mat.
"""
if filename[-4:] == '.npy':
dataset = np.load(filename, allow_pickle=True).item()
elif filename[-4:] == '.mat':
dataset = _check_keys(sio.loadmat(filename,
struct_as_record=False,
squeeze_me=True))['P']
self.model_import(dataset)
[docs]
def is_success(self) -> bool:
"""
Return false if vectors contain nan.
Returns
-------
bool
"""
return (self.get('Model.isCrashed', bool) and self.is_stable())
############
# Smart Components
############
from .smart_components import add_smart_component
from .smart_components_heart import build_heart, build_timings, \
build_pfc
from .smart_components_circulation import build_artven
[docs]
def __iter__(self):
"""
Iterate over export dictionary.
Designed to create a dictionary of the object using dict(self).
Yields
------
key: str
key of dictionary
data[key]: dict
subdictionary of Pdict
"""
data = self.model_export()
for key, value in data.items():
yield (key, value)
[docs]
def __setitem__(self, arg, val) -> bool:
"""
Make self subscriptable.
Parameters
----------
arg: str or slice(None, None, None)
If self[:], arg = slice(None, None, None) and set model
import dictionary.
If self['...'], return self.set(arg, val)
val: dict or any
Dictionary of the model or single value to set.
"""
if arg == slice(None, None, None):
return self.model_import(val)
if isinstance(arg, str) and '.' not in arg:
# TODO: implement partial dictionary
raise ValueError('Currently, it is not possible to set partial '
'dictionaries. Please set the full dictionary '
'using self[:]. ')
if isinstance(arg, str):
return self.set(arg, val)
raise ValueError('arg not known')
[docs]
def __getitem__(self, arg: any) -> any:
"""
Make self object subscriptable.
Parameters
----------
arg: slice or str
If self[:], arg = slice(None, None, None) and return model
export dictionary.
If self['...'], return self.get(arg)
Returns
-------
model_export dictionary or self.get() type
"""
if arg in self._components:
return self._components[arg]
if arg == slice(None, None, None):
return self.model_export()
if isinstance(arg, str) and '.' not in arg:
# TODO: implement partial dictionary
return self.model_export()[arg]
if isinstance(arg, str):
return self.get(arg)
raise KeyError('Unknown key "' + str(arg) + '"')
[docs]
def __getstate__(self):
"""Get state manually, because ctypes can not be pickled."""
state = self.__dict__.copy()
state['data'] = self.model_export()
del state['_model']
return state
[docs]
def __setstate__(self, state):
"""Set state manually, because ctypes can not be pickled."""
self.__init__(
solver=state['_solver_name'],
path_to_circadapt=state['_path_to_circadapt'],
)
self.model_import(state['data'])
[docs]
def __repr__(self):
"""Object representation in string format."""
return (
'<' + self.__class__.__name__ + '>\n' +
'CircAdapt object with keys:\n' +
self.components.__repr__() + '\n' +
'<\\' + self.__class__.__name__ + '>'
)
[docs]
def copy(self):
"""Return a copy of itself."""
raise NotImplementedError('Copy function not yet implemented for '
'this instance.')