Source code for circadapt.circadapt

import os
import sys
from ctypes import cdll, c_double, c_char_p, c_bool, c_int
import ctypes

import scipy.io as sio
import numpy as np

from circadapt.matlab import _check_keys

from circadapt.components import AutoComponent
from circadapt.components.patch import Patch, Patch2022
from circadapt.components.connector import ArtVen, Connector, Diode, \
    Resistance, Valve, Valve2022
from circadapt.components.wall import Wall, Wall2022, LoadExperiment
from circadapt.components.cavity import Bag, Cavity, Capacitor, Tube0D, \
    Chamber, Chamber2022, TriSeg2022, TriSeg
from circadapt.components.node import Node
from circadapt.components.global_functions import Timings, PressureFlowControl
from circadapt.components import General
from circadapt.components.solver import Solver

from circadapt.error import raise_error_on_non_existance_set, except_OSerror
from circadapt.error import catch_general_error
from circadapt.error import ModelCrashed, TriggerNotFound, CorruptBuild, \
    ModelNotStable

from .settings import __version__

from circadapt import get_default_path_to_circadapt

# %% Plugins
_loaded_plugins = []


[docs] def load_plugin_components(filename: str): """ Load a plugin from a library. These plugins will be loaded every time a new model is loaded. Parameters ---------- filename : str path to library. """ if not os.path.isfile(filename): raise FileNotFoundError( f"Plugin could not be loaded. File '{filename}' does not exist.") _loaded_plugins.append(filename)
# %% Decorator
[docs] def check_build_decorator(func): """Decorator to check build when it is not checked.""" def inner(self, *arg, **kwarg): if not self._is_checked: self.check_build() return func(self, *arg, **kwarg) return inner
# %% Class
[docs] class CircAdapt: """ Wrapper to communicate with c++ code. Parameters ---------- solver: str (optional) Name of solver to be build. If not given, the default models solver is used. model: str Name of the model to be build. path_to_circadapt: str (optional) Path to CircAdapt library. If not given, the default_path_to_circadapt is used. model_state: (multi) Model state to be loaded. (default) -> load model reference from package False -> do nothing str -> load filename dict -> load given model state """ # Init CircAdapt by telling where to find the dll file _local_save_reference = False def __init__(self, solver: str = None, model: str = 'Custom', path_to_circadapt: str = None, model_state: dict = None, ): # set default path self.path = os.path.dirname(sys.modules["circadapt"].__file__) self._model = None self._idx_model = None self.version = __version__ # safty measures self._count_run_commands = 0 self._is_checked = False # true if check_build is triggered # list of components for easy i/o self._components = {} self._componentlocs = {} self.components = [] self._module_rename = {} # init solver and general elements self._components['Solver'] = Solver(self) self._components['General'] = General(self) self.components = list(self._components.keys()) if path_to_circadapt is None: path_to_circadapt = os.path.join( self.path, get_default_path_to_circadapt()) self._path_to_circadapt = path_to_circadapt self._update_settings() model_state = self.get_model_reference(model_state) self._build(model, solver, model_state=model_state, ) # load model_state into c++ object if model_state is not None and model_state is not False: self.model_import(model_state) def _update_settings(self): """Function to overrule settings in derived classes."""
[docs] def __del__(self): """ Destroy object. This function is triggered when the python wrapper object is deleted or overwritten. As this wrapper only has pointers to the c++ object, the destructor of the c++ object will be triggered. """ self._destroy_model()
def _destroy_model(self): if self._model is not None and self._idx_model is not None: self._model.delete_core(self._idx_model) self._model = None ###### # Build functions ###### def _build(self, model, solver=None, model_state=None, ): self._model_name = model self._solver_name = solver # get solver from model state if solver is None: solver = model_state['Solver']['Type'] # init the c++ object self._init_c_object() # build the c++ object model = model.encode('ASCII') solver = solver.encode('ASCII') # build model self._idx_model = self._model.build(model, solver) if self._idx_model < 0: raise Exception("Error while building the model.") self.build() self.check_build()
[docs] def build(self): """ Build model. Add and link components in the model. By default, the model is empty. This function will be used in derived model builts. """
[docs] def check_build(self): if not self._get_bool('check_build'): self.trigger('check_build') raise CorruptBuild() self._is_checked = True
[docs] def get_model_reference(self, model_state: any = None, ) -> dict: """ Return reference model state as a dictionary. If model_state is string, it is assumed to be a file location. This file will be loaded and returned. If is none, the default reference will be loaded. If no default reference available, it will be created and stored in the current active directory. Parameters ---------- model_state : str or None, optional Location of model state. The default is None. Raises ------ ValueError DESCRIPTION. Returns ------- dict Dictonary with model state. """ if model_state is None: # try to load reference from package filename = self._get_reference_filename() if os.path.isfile(filename): model_state = filename if model_state is None: model_state = self._load_reference_from_folder() if isinstance(model_state, str) and os.path.isfile(model_state): return np.load(model_state, allow_pickle=True).item() if isinstance(model_state, str): return None if model_state is None or model_state is False: return model_state raise ValueError('Parameter model_state not recognized.')
def _get_reference_filename(self): return os.path.join( self.path, "reference_model_states/ref_" + self.__class__.__name__ + ".npy", ) def _load_reference_from_folder(self): filename = "P_ref_" + self.__class__.__name__ + ".npy" if os.path.isfile(filename): return filename elif self._model is not None: self.set_reference() # save to folder for quicker loading reference next time if self._local_save_reference and self._model is not None: model_state = self.model_export(style='General') np.save(filename, model_state, allow_pickle=True) return filename
[docs] def load_reference(self): """Load reference of current model from the package.""" model_state = self.get_model_reference() self.model_import(model_state)
###### # General initalization ###### def _init_c_object(self): self._model = cdll.LoadLibrary(self._path_to_circadapt) for plugin in _loaded_plugins: self.load_plugin_components(plugin) # Build self._model.build.argtypes = [c_char_p, c_char_p] self._model.build.restype = c_int # Get and set self._model.get_double.restype = c_bool self._model.get_double.argtypes = [ c_int, c_char_p, ctypes.POINTER(ctypes.c_double), ] self._model.set_double.restype = c_bool self._model.set_double.argtypes = [c_int, c_char_p, c_double] self._model.get_double_vector.restype = c_bool self._model.get_double_vector.argtypes = [c_int, c_char_p] self._model.get_double_vector_length.restype = c_int self._model.get_double_vector_length.argtypes = [c_int, c_char_p] self._model.set_double_vector.restype = c_bool # self._model.set_double_vector.argtypes = [c_int, c_char_p, c_double, c_int] self._model.load_string_to_buffer.restype = c_int self._model.load_string_to_buffer.argtypes = [c_int, c_char_p] self._model.get_string_from_buffer.restype = c_bool self._model.get_string_from_buffer.argtypes = [c_int, c_char_p] self._model.get_bool.restype = c_bool self._model.get_bool.argtypes = [ c_int, c_char_p, ctypes.POINTER(ctypes.c_bool), ] self._model.set_bool.restype = c_bool self._model.set_bool.argtypes = [c_int, c_char_p, c_bool] self._model.get_int.restype = c_bool self._model.get_int.argtypes = [ c_int, c_char_p, ctypes.POINTER(ctypes.c_int), ] self._model.set_int.restype = c_bool self._model.set_int.argtypes = [c_int, c_char_p, c_int] self._model.trigger.restype = c_bool self._model.trigger.argtypes = [c_int, c_char_p] self._model.get_version.restype = c_char_p self._model.run_n_beats.argtypes = [c_int, c_int, c_bool] # Add components to empty model and set Component to link components self._model.add_component.restype = c_bool self._model.add_component.argtypes = [c_int, c_char_p, c_char_p] self._model.set_component.restype = c_bool self._model.set_component.argtypes = [c_int, c_char_p, c_char_p] self._model.load_plugin_component.restype = c_bool self._model.load_plugin_component.argtypes = [c_char_p] self._model.component_exists.restype = c_bool self._model.component_exists.argtypes = [c_char_p]
[docs] def load_plugin_components(self, path_to_library: str): """ Load a plugin Parameters ---------- path_to_library : str Path to library. """ _path = path_to_library.encode('ASCII') if self._model.load_plugin_component(_path): return True raise ValueError('Could not import plugin file "' + path_to_library + '"')
###### # Functions to run beats in CircAdapt ###### @except_OSerror @check_build_decorator def run(self, n_beats: int = 1, stable: bool = False, export: bool = True) -> None: """ Run the model by solving the ODEs for n_beats. The currently loaded model, model state, and parameterization will be used to run n_beats. Optional, more beats are performed until the model is hemodynamically stable (stable=True). By default, all data will be exported (export=True). By disabling this (export=False), the model runs faster by only solving the state variables in case a backward ODE solver is used. Parameters ---------- n_beats: int, optional (default: True) Number of beats stable: bool, optional (default: False) Run beats until simulation is hemodynamically stable export: bool, optional (default: False) Future """ self._count_run_commands += 1 if stable: self._model.run_stable(self._idx_model, export) else: self._model.run_n_beats(self._idx_model, n_beats, export) if self.get('Model.is_crashed'): raise ModelCrashed() if stable and not self.is_stable(): raise ModelNotStable()
[docs] def is_stable(self) -> bool: """Check if simulation is hemodynamically stable.""" return self.get('Model.is_stable', dtype=bool)
[docs] @check_build_decorator def get(self, par: str, dtype: any = None, ) -> any: """ Get parameter from the model. Parameters ---------- par: str Parameter name to get. This string value contains the parameter that will be obtained and the component from which it will be obtained. Components, its subcomponents, and the parameter name at the and are separated with a dot. dtype: string or type, optional (default = None) The type of parameter that will be obtained. If not specified, it will be detairmined automatically. If computational cost is important, it is better to specify the dtype. Returns ------- Get value: type depending on dtype. Returns the value stored in the c++ object """ if dtype is None: dtype = self._find_dtype(par) if dtype in ['double', float, np.float64]: return self._get_double(par) if dtype in [list, "double_vector"]: return self._get_double_vector(par) if dtype in ["bool", bool]: return self._get_bool(par) if dtype in ["int", int]: return self._get_int(par) if dtype in ['str', 'none'] or dtype == str: return self._get_str(par) if dtype in ['component']: return self._get_str(par) # state_variables are not always initialized as vectors. if dtype == 'state_variable': try: return self._get_double_vector(par) except ValueError: return self._get_double(par) raise ValueError(f'Parameter {par} type not identified. ')
def _find_dtype(self, full_pararameter_path: str) -> str: reverse_split = full_pararameter_path[::-1].split('.', 1) reverse_par, reverse_base = reverse_split if len(reverse_split) == 2 \ else (reverse_split[0], '') if len(reverse_base) == 0: return self._get_str('parameter_type:'+reverse_par[::-1]) return self._get_str(reverse_base[::-1]+'.parameter_type:'+reverse_par[::-1])
[docs] @check_build_decorator def set(self, par: str, val: any, dtype=None) -> bool: """ Set parameter from dtype with value val. Parameters ---------- par: str Parameter name to get. This string value contains the parameter that will be obtained and the component from which it will be obtained. Components, its subcomponents, and the parameter name at the and are separated with a dot. val: any Value to be set. The value must be the same as the parameter used. dtype: string or type, optional (default = None) The type of parameter that will be obtained. If not specified, it will be detairmined automatically. If computational cost is important, it is better to specify the dtype. Returns ------- bool: success or not """ if dtype is None: dtype = self._find_dtype(par) if (dtype in ['double', 'state_variable', float, np.float64] or (dtype == 'double_vector' and isinstance(val, float)) or (dtype == 'double_vector' and isinstance(val, int))): return self._set_double(par, val) if dtype == 'double_vector': return self._set_double_vector(par, val) if dtype == 'component': return self.set_component(par, val) if dtype in ['bool', bool]: return self._set_bool(par, val) if dtype in ['int', int]: return self._set_int(par, val) raise ValueError('Parameter ' + par + ' not found.')
[docs] def trigger(self, par) -> bool: """ Trigger a function. Objects in the model may have a function that can be triggered. This can be done by triggering the function using the 'par' parameter similar to the set function, only without a parameter. Parameters ---------- par : str Function in object that will be triggered. It has a similar form to the set/get par, i.e. 'Component.subcomponent.function_name' Returns ------- is_success (bool) """ _par = par.encode('ASCII') is_success = self._model.trigger(self._idx_model, _par) if not is_success: raise TriggerNotFound(par) return True
@raise_error_on_non_existance_set @catch_general_error def _set_bool(self, par, val): _par = par.encode('ASCII') is_success = self._model.set_bool(self._idx_model, _par, bool(val)) return is_success def _get_bool(self, par): _par = par.encode('ASCII') result = ctypes.c_bool() result_ptr = ctypes.pointer(result) is_success = self._model.get_bool(self._idx_model, _par, result_ptr) # Raise error if bool parameter does not exist if not is_success: raise NameError('Parameter "'+par+'" could not be found.') return result.value @raise_error_on_non_existance_set def _set_int(self, par, val): _par = par.encode('ASCII') is_success = self._model.set_int(self._idx_model, _par, int(val)) return is_success def _get_int(self, par): _par = par.encode('ASCII') result = ctypes.c_int() result_ptr = ctypes.pointer(result) is_success = self._model.get_int(self._idx_model, _par, result_ptr) # Raise error if int parameter does not exist if not is_success: raise NameError('Integer parameter "'+par+'" could not be found.') return result.value def _get_double(self, par): """Get a double value from the model.""" _par = par.encode('ASCII') result = ctypes.c_double() result_ptr = ctypes.pointer(result) success = self._model.get_double(self._idx_model, _par, result_ptr) if not success: raise NameError('Double parameter "'+par+'" could not be found.') return result.value @raise_error_on_non_existance_set def _set_double(self, par, val): """Set a parameter par to double value val.""" _par = par.encode('ASCII') is_success = self._model.set_double( self._idx_model, _par, c_double(float(val))) return is_success def _get_str(self, _par): """Get a char value from the model.""" par = _par.encode('ASCII') buffer_size = self._model.load_string_to_buffer(self._idx_model, par) if buffer_size < 0: raise ValueError('Parameter "' + _par + '" not found. ') output_buffer = ctypes.create_string_buffer(buffer_size) is_success = self._model.get_string_from_buffer(buffer_size, output_buffer) value = str(output_buffer, 'utf-8') return value def _get_double_vector(self, par): """Get Double Vector.""" # Get vector get_vector_length # length_vector = self._model.get_vector_length(self._idx_model) _par = par.encode('ASCII') length_vector = self._model.get_double_vector_length( self._idx_model, _par) if length_vector < 0: raise ValueError(f'{par} not found.') # init empty data array data = (c_double * length_vector)() # equivalent to C++ double[vecLen] # Fill data is_success = self._model.get_double_vector(self._idx_model, _par, data) # raise data if vector does not exist if not is_success: raise NameError('Parameter "'+par+'" could not be found.') return np.array(data) def _set_double_vector(self, par, value): """Set Double Vector.""" # Get vector get_vector_length # length_vector = self._model.get_vector_length(self._idx_model) _par = par.encode('ASCII') length_vector = len(value) if length_vector < 0: raise ValueError(f'{par} not found.') # init empty data array data = (c_double * length_vector)() # equivalent to C++ double[vecLen] for i in range(length_vector): data[i] = value[i] # Fill data is_success = self._model.set_double_vector( self._idx_model, _par, data, length_vector) # raise data if vector does not exist if not is_success: raise NameError('Parameter "'+par+'" could not be found.') return True
[docs] def add(self, comp_type: str) -> bool: return self._add_component_to_wrapper(comp_type)
[docs] def add_component(self, comp_type: str, comp_name: str, base: str = '') -> bool: """ Add component to CircAdapt object. Parameters ---------- comp_type: str Type of object to create in the ComponentFactory comp_name: str Name of the new object to create base: char, optional (default='') Parent object of new component Returns ------- is_success (bool) """ self._is_checked = False # check model before run _base = base.encode('ASCII') _comp_type = comp_type.encode('ASCII') _comp_name = comp_name.encode('ASCII') if self._model.add_component(self._idx_model, _base, _comp_type, _comp_name): return self._add_component_to_wrapper(comp_type, base+'.'+comp_name) # Component not loaded, raise error if not self._model.component_exists(_comp_type): raise Exception(f'The CircAdapt model does not have a component ' f'named {comp_type}. Check the spelling of the ' 'component type or make sure the correct plugin ' 'is loaded.') raise Exception('Component not added for unknown reason.')
def _add_component_to_wrapper(self, comp_type, cpp_location=None): # add Model. to cpp location if cpp_location is None: s = None else: s = 'Model.' s = (s[:-1] if cpp_location[0]=='.' else s) + cpp_location return self._check_component_in_export_list_and_add_object( comp_type, s, ) # print('TODO: add ', cpp_location, ' to ', comp_type) return False # map strings to objects. If item is list, then first item is object # and following are strings of dependend objects that have to be # initialized _str2comp = { 'ArtVen': [ArtVen, 'Connector'], 'Bag': [Bag, 'Cavity'], 'Capacitor': [Capacitor, 'Cavity'], 'Cavity': [Cavity, 'Node'], 'Chamber': [Chamber, 'Cavity'], 'Chamber2022': [Chamber2022, 'Cavity'], 'Connector': Connector, 'Diode': [Diode, 'Connector'], 'Node': Node, 'Patch': Patch, 'Patch2022': Patch2022, 'PressureFlowControl': PressureFlowControl, 'LoadExperiment': [LoadExperiment, 'Wall'], 'Resistance': [Resistance, 'Connector'], 'Timings': Timings, 'TriSeg2022': TriSeg2022, 'TriSeg': TriSeg, 'Tube0D': [Tube0D, 'Cavity'], 'Valve': [Valve, 'Connector'], 'Valve2022': [Valve2022, 'Connector'], 'Wall': Wall, 'Wall2022': Wall2022, } def _check_component_in_export_list_and_add_object(self, comp_type, loc=None, ): """Add comp type if is not in list. Then, add object to all relatebles.""" if comp_type not in self._str2comp.keys(): comp_type_to_use = self._module_rename.get(comp_type, comp_type) return self._create_auto_component(comp_type_to_use, loc) obj = self._str2comp[comp_type] if isinstance(obj, list): for o in obj[1:]: self._check_component_in_export_list_and_add_object(o, loc) obj = obj[0] comp_type_to_use = self._module_rename.get(comp_type, comp_type) if comp_type_to_use not in self._components: self._components[comp_type_to_use] = obj(comp_type, self) if loc is not None: self._components[comp_type_to_use].add_object(loc) self.components = list(self._components.keys()) self.components.sort() return True def _create_auto_component(self, comp_type, loc=None): if comp_type not in self._components: self._components[comp_type] = AutoComponent(comp_type, self) if loc is not None: self._components[comp_type].add_object(loc) self.components = list(self._components.keys()) self.components.sort() return True @raise_error_on_non_existance_set def set_component(self, par, obj) -> bool: """ Set component to the parameter of a CircAdapt object. Parameters ---------- par: str Parameter of object to link object obj to obj: str Object to set """ # model structure is changing, so recheck self._is_checked = False _par = par.encode('ASCII') _obj = obj.encode('ASCII') if not self._model.set_component(self._idx_model, _par, _obj): raise Exception('Component not set') return True ############ # Set model_state ############
[docs] def model_import(self, model_state, check_model_state=False) -> None: """ Load model_state into CircAdapt object. Style and model_state version is automatically recognized. Parameters ---------- model_state: dict model_state with data to set obj: str Object to set """ # automatically recognize style of object model_state if len(model_state.keys()) == 3 \ and 'model' in model_state.keys() and 'Model' in model_state.keys() \ and 'Solver' in model_state.keys(): style = 'Custom' else: style = self._get_default_model_state_style() # here, style must be Custom or Empty, otherwise, raise error if style not in ["Custom", "General"]: raise ValueError(f'Style "{style}" unknown.') # Object is general style, so load according to general style if check_model_state: self._check_model_state(model_state) # TODO: build model according to object description self._set_model_state_model_parameters('Model', model_state['Model']) self._set_model_state_model_parameters('Model', model_state['Model'], field='state_variables') # self._set_model_state_model_parameters('Model', model_state['Model'], # field='export_double_vectors') self._set_model_state_model_parameters('Solver', model_state['Solver'])
def _check_model_state(self, model_state): """ Check if model_state (param) is correct. Give warnings if versions do not match. """ # check version if self.get('Version', str) < model_state['model']['Version']: print('----------') print('- Warning: The stored P-dict origins from an other version' ' than the used model.') print('- Check the documentation whether this P-dict' 'is compatible.') # check model type if self.get('Model.type', str) != model_state['Model']['Type']: print('----------') print('- Warning: Model type is not the same. ') print('- This might result in incomplete results.') print('- Change the model name when building the model.') # check model name if self.get('Model.name', str) != model_state['Model']['Name']: print('----------') print('- Warning: Model type is not the same.') print('- This might result in incomplete results.') print('- Change the model name when building the model.') return True # Set Parameters def _set_model_state_model_parameters1(self, level, components, field='parameters', field1='parameters_double', dtype=float, ): if field == "export_double_vectors": dtype = bool n_parameters = self.get(level + '.n_' + field1, int) if n_parameters > 0: # Set parameters for i in range(n_parameters): char_name = level + '.' + field1 + ':' + str(i) char_name = self.get(char_name, str) if char_name not in components[field]: print('- Warning: Parameter ', char_name, ' not in P-dict. Reference value is used. ') else: if dtype == float: succes = self._set_double(level+'.'+char_name, components[field][char_name]) elif dtype == int: succes = self._set_int(level+'.'+char_name, components[field][char_name]) elif dtype == bool: succes = self._set_bool(level+'.'+char_name, components[field][char_name]) else: succes = False if not succes: print('- Warning: parameter not set: ', level + '.' + char_name) def _set_model_state_model_parameters(self, level, components, field='parameters', ): if field == "parameters": self._set_model_state_model_parameters1( level, components, field, field.lower()+"_double", dtype=float) self._set_model_state_model_parameters1( level, components, field, field.lower()+"_bool", dtype=bool) self._set_model_state_model_parameters1( level, components, field, field.lower()+"_int", dtype=int) else: self._set_model_state_model_parameters1( level, components, field, field.lower()) if field == 'state_variables': self._set_state_variables(level, components) # solver does not have subcomponents, so return if level == 'Solver': return # Set subcomponents #n_subcomponents = int(self.get(level+'.Subcomponents', float)) n_subcomponents = self.get(level+'.n_subcomponents', int) subcomponents_of_component = np.array( [sub_component['name'] for sub_component in components['subcomponents']]) if n_subcomponents > 0: for i in range(n_subcomponents): char_name = level + '.subcomponents:' + str(i) char_name = self.get(char_name, str) # warn if subcomponent is not in the model sub_component = subcomponents_of_component == char_name if np.sum(sub_component) == 0: print('----------') print('- Warning: Model subcomponent ', char_name, ' not in P-dict. ') print('- Reference values are used. ') else: self._set_model_state_model_parameters( level + '.' + components['subcomponents'][ np.argmax(sub_component)]['name'], components['subcomponents'][np.argmax(sub_component)], field=field, ) def _set_state_variables(self, level, components): n_state_variables = self.get(level + '.n_state_variables', int) n_state_variable_vectors = self.get(level + '.n_state_variable_vectors', int) names = ([self.get(level + '.state_variables:' + str(i), str) for i in range(n_state_variables)] + [self.get(level + '.state_variable_vectors:' + str(i), str) for i in range(n_state_variable_vectors)]) if len(names) > 0: # Set parameters for name in names: if name not in components['state_variables']: print('- Warning: Parameter ', name, ' not in P-dict. Reference value is used. ') else: value = components['state_variables'][name] dtype = 'double_vector' if hasattr(value, '__len__') else 'double' succes = self.set( level + '.' + name, value, dtype = dtype ) if not succes: print('- Warning: parameter not set: ', level + '.' + name) ############ # Get model_state ############ def _get_default_model_state_style(self): """Get default style of the stored model state based on the name.""" return 'General'
[docs] def model_export(self, style: str = None) -> dict: """ Return the stored model state. Parameters ---------- style: str (optional) Style to follow. If not given, the default style from the model is used. Returns ------- dict """ if style is None: style = self._get_default_model_state_style() # if style is given, apply model_state style if style not in ["Custom", "General"]: raise ValueError(f'Style "{style}" unknown.') model = self._fill_model_state_model('Model') return { 'model': self._fill_model_state(), 'Model': model, 'Solver': self._fill_model_state_solver(), }
# model def _fill_model_state(self): return { 'Version': self.get('Version', str) } # Fill Solver def _fill_model_state_solver(self): # Basics solver_type = self.get('Solver.type', str) # Parameters solver_parameters = self._fill_model_state_model_parameters('Solver') # State variables at last / next-first iteration # StateVariables = self.fill_model_state_model_parameters( # 'Solver', field='StateVariables') # Exported Features export_vectors = self._fill_model_state_model_double_vectors('Solver') return {'type': solver_type, 'parameters': solver_parameters, 'export_double_vectors': export_vectors} # Fill model def _fill_model_state_model(self, level): # Basics level_name = self.get(level+'.name', str) level_type = self.get(level+'.type', str) # Subcomponents subcomponents = self._fill_model_state_model_subcomponents(level) # Parameters parameters = self._fill_model_state_model_parameters(level) # State variables at last / next-first iteration state_variables \ = self._fill_model_state_model_parameters(level, field='state_variables', ) state_variables \ = self._fill_model_state_model_parameters_state_variable_vectors( level, state_variables) # Exported Features export_vectors = self._fill_model_state_model_double_vectors(level) return {'name': level_name, 'type': level_type, 'subcomponents': subcomponents, 'parameters': parameters, 'state_variables': state_variables, 'export_double_vectors': export_vectors, } # Get subcomponents def _fill_model_state_model_subcomponents(self, level): n_subcomponents = self.get(level+'.n_subcomponents', int) if n_subcomponents == 0: return [] subcomponents = [] for i in range(n_subcomponents): char_name = level+'.subcomponents:'+str(i) char_name = self.get(char_name, str) subcomponents.append( self._fill_model_state_model(level+'.'+char_name)) return subcomponents # Get Parameters def _fill_model_state_model_parameters(self, level, field='parameters', dtype=float, ): parameters = {} if field == "parameters": parameters = parameters | self._fill_model_state_model_parameters( level, field=field+'_double', dtype=float) parameters = parameters | self._fill_model_state_model_parameters( level, field=field+'_bool', dtype=bool) parameters = parameters | self._fill_model_state_model_parameters( level, field=field+'_int', dtype=int) else: n_parameters = self._get_int(level + '.n_' + field) if n_parameters == 0: return parameters for i in range(n_parameters): char_name = level+'.' + field + ':' + str(i) char_name = self.get(char_name, str) par = level + '.' + char_name parameters[char_name] = self.get(par, dtype) return parameters # Get Parameters def _fill_model_state_model_parameters_state_variable_vectors( self, level, state_variables): field = 'state_variable_vectors' n_parameters = self.get(level + '.n_' + field, int) if n_parameters == 0: return state_variables for i in range(n_parameters): char_name = level + '.' + field + ':' + str(i) par_name = self._get_str(char_name) vec = self.get(level + '.' + par_name, 'double_vector') state_variables[par_name] = np.array(vec) return state_variables # Get DoubleVectors def _fill_model_state_model_double_vectors(self, level, field='export_double_vectors'): n_parameters = self._get_int(level + '.n_' + field) if n_parameters == 0: return [] parameters = {} # for i in range(n_parameters): # char_name = level + '.' + field + ':' + str(i) # char_name = self._get_str(char_name) # if self._get_bool(level + '.' + char_name): # parameters[char_name] = self._get_double_vector(level + '.' # + char_name) return parameters
[docs] def save(self, filename: str, ext: str = None) -> None: """ Save model to filename with extention. Parameters ---------- filename: str Filename to save file to. Filename must have extention .npy or .mat ext: str (optional) Extention of the file. If not given, the last 3 letters of the filename is used to determine the save method. """ if ext is None: ext = filename[-3:] if ext == 'npy': dataset = self.model_export(style='General') np.save(filename, dataset, allow_pickle=True) return if ext == 'mat': dataset = self.model_export() sio.savemat(filename, {'P': dataset}) return raise ValueError('Extenstion not known')
[docs] def load(self, filename: str) -> None: """ Load a model dataset from a filename. Parameters ---------- filename: str Path to file that will be loaded. The extension must be .npy or .mat. """ if filename[-4:] == '.npy': dataset = np.load(filename, allow_pickle=True).item() elif filename[-4:] == '.mat': dataset = _check_keys(sio.loadmat(filename, struct_as_record=False, squeeze_me=True))['P'] self.model_import(dataset)
[docs] def is_success(self) -> bool: """ Return false if vectors contain nan. Returns ------- bool """ return (self.get('Model.isCrashed', bool) and self.is_stable())
############ # Smart Components ############ from .smart_components import add_smart_component from .smart_components_heart import build_heart, build_timings, \ build_pfc from .smart_components_circulation import build_artven
[docs] def __iter__(self): """ Iterate over export dictionary. Designed to create a dictionary of the object using dict(self). Yields ------ key: str key of dictionary data[key]: dict subdictionary of Pdict """ data = self.model_export() for key, value in data.items(): yield (key, value)
[docs] def __setitem__(self, arg, val) -> bool: """ Make self subscriptable. Parameters ---------- arg: str or slice(None, None, None) If self[:], arg = slice(None, None, None) and set model import dictionary. If self['...'], return self.set(arg, val) val: dict or any Dictionary of the model or single value to set. """ if arg == slice(None, None, None): return self.model_import(val) if isinstance(arg, str) and '.' not in arg: # TODO: implement partial dictionary raise ValueError('Currently, it is not possible to set partial ' 'dictionaries. Please set the full dictionary ' 'using self[:]. ') if isinstance(arg, str): return self.set(arg, val) raise ValueError('arg not known')
[docs] def __getitem__(self, arg: any) -> any: """ Make self object subscriptable. Parameters ---------- arg: slice or str If self[:], arg = slice(None, None, None) and return model export dictionary. If self['...'], return self.get(arg) Returns ------- model_export dictionary or self.get() type """ if arg in self._components: return self._components[arg] if arg == slice(None, None, None): return self.model_export() if isinstance(arg, str) and '.' not in arg: # TODO: implement partial dictionary return self.model_export()[arg] if isinstance(arg, str): return self.get(arg) raise KeyError('Unknown key "' + str(arg) + '"')
[docs] def __getstate__(self): """Get state manually, because ctypes can not be pickled.""" state = self.__dict__.copy() state['data'] = self.model_export() del state['_model'] return state
[docs] def __setstate__(self, state): """Set state manually, because ctypes can not be pickled.""" self.__init__( solver=state['_solver_name'], path_to_circadapt=state['_path_to_circadapt'], ) self.model_import(state['data'])
[docs] def __repr__(self): """Object representation in string format.""" return ( '<' + self.__class__.__name__ + '>\n' + 'CircAdapt object with keys:\n' + self.components.__repr__() + '\n' + '<\\' + self.__class__.__name__ + '>' )
[docs] def copy(self): """Return a copy of itself.""" raise NotImplementedError('Copy function not yet implemented for ' 'this instance.')