Source code for circadapt.adapt


import numpy as np
import copy
from circadapt.error import ModelNotStable, ModelCrashed


[docs] class ModelAdapt(): """Model class to be inherited to give model adaptation protocol.""" def __init__(self): self._wall_loc = [ 'Model.Peri.La.wLa.', 'Model.Peri.Ra.wRa.', ] self._triseg_loc = [ 'Model.Peri.TriSeg.', ] self._patch_loc = [ 'Model.Peri.La.wLa.pLa0.', 'Model.Peri.Ra.wRa.pRa0.', 'Model.Peri.TriSeg.wLv.pLv0.', 'Model.Peri.TriSeg.wSv.pSv0.', 'Model.Peri.TriSeg.wRv.pRv0.', ] self._all_senses = ['SfEcmMax', 'SfActMax', 'SfPasAct', 'LsPasAct', ] self._all_senses_norm = ['SfPasMaxT', 'FacSfActT', 'SfPasActT', 'LsPasActT', ] self._all_actors = ['V_wall', 'Am_ref', 'Sf_pas', ] self._artven_cavities = [ 'Model.SyArt.', 'Model.SyVen.', 'Model.PuArt.', 'Model.PuVen.', ] self._artven_names = [ 'SyArt', 'SyVen', 'PuArt', 'PuVen', ] self._artven_actors = [ 'A_wall', 'A0', 'p0', ] self._valve_names = [ 'Model.Peri.SyVenRa.', 'Model.Peri.RaRv.', 'Model.Peri.RvPuArt.', 'Model.Peri.PuVenLa.', 'Model.Peri.LaLv.', 'Model.Peri.LvSyArt.', ] self._valve_actors = [ 'A_open', ] self._bags = ['Model.Peri.'] self._adapt_options = { 'rest': { 't_cycle': 0.85, 'q0': 8.5e-5, 'ventricles': { }, 'General': {}, }, 'exercise': { 't_cycle': 0.85 / 2, 'q0': 8.5e-5 * 3, 'ventricles': { }, 'General': {}, }, 'protocol': { 'n_beats': 50, 'n_cycles': 3, 'adapt_Wall': True, 'adapt_Bag': True, 'adapt_ArtVen': True, 'adapt_Valve': True, 'adapt_Valve_l': True, 'adapt_TriSeg': True, }, } _adapt_patch_object = 'Patch2022'
[docs] def get_adapt_options(self): """Get default adapt options.""" options = copy.deepcopy(self._adapt_options) return options
def _update_options(self, _options): # first update options options = self.get_adapt_options() options.update(_options) # now check for exercise ventricular parameters # set key in rest if not exist to current value assuming it is in rest for key in options['exercise']['ventricles']: if key not in options['rest']['ventricles']: options['rest']['ventricles'][key] = \ self[self._adapt_patch_object][key][2:] for key in options['exercise']['General']: if key not in options['rest']['General']: options['rest']['General'][key] = \ self['General'][key] return options
[docs] def adapt(self, options: dict = {}, output: bool = False, verbose: bool = False, ) -> None: """ Run adaptation protocol. The adaptation protocol runs n_cycles cycles. Each cycle has two phases, namely rest adaptation and exercise adaptation. First, in exercise, the Patches and vessel wall volumes are adapted to load. In rest, vessels are adapted to flow. Parameters ---------- options : dictionary, optional Options for the protocol. To set options, use the function get_adapt_options(), which is used by default. The default is {}. output : bool, optional DESCRIPTION. The default is False. Returns ------- senses_results : TYPE Only when output=True. senses_norm : TYPE Only when output=True. actor_results : TYPE Only when output=True. actor_vwall : TYPE Only when output=True. """ if verbose: print('Start Adaptation protocol') options = self._update_options(options) # get options n_beats = options['protocol']['n_beats'] n_cycles = options['protocol']['n_cycles'] # allocate if output: (senses_results, senses_norm, actor_results, actor_vwall, actor_valve) = \ self._allocate_output(n_beats * n_cycles * 2) # run adaptation if verbose: print('Run Beats') for i in range(n_beats * n_cycles * 2): if verbose: print('\rBeat '+str(i)+'/'+str(n_beats * n_cycles * 2), end="") if (i // n_beats) % 2 == 0: self._run_simulation(options['exercise']) if self.get('Model.PFC.total_flow_error') < 0.05: self.adapt_exercise(options) else: self.run(stable=True) else: self._run_simulation(options['rest']) if self.get('Model.PFC.total_flow_error') < 0.05: self.adapt_rest(options) else: self.run(stable=True) if output: self._adapt_store_data( i, senses_results, senses_norm, actor_results, actor_vwall, actor_valve, ) # Bring back to rest self._run_simulation(options['rest']) if output: return (senses_results, senses_norm, actor_results, actor_vwall, actor_valve)
def _allocate_output(self, nBeats): senses_results = np.ndarray((nBeats, len(self._patch_loc), len(self._all_senses))) senses_norm = np.ndarray((nBeats, len(self._patch_loc), len(self._all_senses))) actor_results = np.ndarray((nBeats, len(self._patch_loc), len(self._all_actors))) actor_vwall = np.ndarray((nBeats, len(self._artven_names), len(self._artven_actors))) actor_valve = np.ndarray((nBeats, len(self._valve_names), len(self._valve_actors))) return (senses_results, senses_norm, actor_results, actor_vwall, actor_valve)
[docs] def adapt_exercise(self, options): """Trigger all excercise adaptation functions.""" if options['protocol']['adapt_Wall']: for wall in self._wall_loc: self.trigger(wall+'adapt_exercise') if options['protocol']['adapt_TriSeg']: for triseg in self._triseg_loc: self.trigger(triseg+'adapt_exercise') if options['protocol']['adapt_ArtVen']: for avc in self._artven_cavities: self.trigger(avc+'adapt_exercise') if options['protocol']['adapt_Bag']: for bag in self._bags: self.trigger(bag+'adapt_exercise')
[docs] def adapt_rest(self, options): """Trigger all rest adaptation functions.""" if options['protocol']['adapt_ArtVen']: for avc in self._artven_cavities: self.trigger(avc+'adapt_rest') if options['protocol']['adapt_Valve']: for valve in self._valve_names: self.trigger(valve+'adapt_rest') # TODO: move to c++ if options['protocol']['adapt_Valve_l']: A_open = self['Valve']['A_open'] self['Valve']['l'] = np.sqrt(A_open / np.pi)
def _run_simulation(self, sub_options): self._set_parameters(sub_options) # self.run(stable=True) self.run(1) # stability essential for adaptation # if not self.get('Model.is_stable'): # raise ModelNotStable() def _set_parameters(self, sub_options): run_sims = self.get('Model.t_cycle') != sub_options['t_cycle'] run_sims = run_sims | (self.get('Model.PFC.q0') != sub_options['q0']) if run_sims: t_cycle = self.get('Model.t_cycle') q0 = self.get('Model.PFC.q0') self.set('Model.t_cycle', 0.5*(sub_options['t_cycle'] + t_cycle)) self.set('Model.PFC.q0', 0.5*(sub_options['q0'] + q0)) self.run(5) self.set('Model.t_cycle', sub_options['t_cycle']) self.set('Model.PFC.q0', sub_options['q0']) # ventricles for key, value in sub_options['ventricles'].items(): self[self._adapt_patch_object][key][2:] = value for key, value in sub_options['General'].items(): self['General'][key] = value if run_sims: self.run(stable=True) def _adapt_store_data( self, i, senses_results, senses_norm, actor_results, actor_vwall, actor_valve, ): # patch for i_wall, wall in enumerate(self._patch_loc): for i_sens, sens in enumerate(self._all_senses): senses_results[i, i_wall, i_sens] = self.get(wall+sens, dtype=float) s = wall+self._all_senses_norm[i_sens] senses_norm[i, i_wall, i_sens] = self.get(s, dtype=float) if self._all_senses_norm[i_sens] == 'FacSfActT': senses_norm[i, i_wall, i_sens] *= self.get(wall+'Sf_act', dtype=float) for i_act, act in enumerate(self._all_actors): actor_results[i, i_wall, i_act] = self.get(wall+act, dtype=float) # artven for i_wall, loc in enumerate(self._artven_cavities): for i_act, act in enumerate(self._artven_actors): actor_vwall[i, i_wall, i_act] = self.get(loc+act, dtype=float) # valve for i_valve, loc in enumerate(self._valve_names): for i_act, act in enumerate(self._valve_actors): actor_valve[i, i_valve, i_act] = self.get(loc+act, dtype=float)
[docs] def calculate_and_set_matrix(self, verbose=False): mean_dlogpar_dlogsens = self.get_matrix(verbose) for patch in [ 'Model.Peri.La.wLa.pLa0', 'Model.Peri.Ra.wRa.pRa0', 'Model.Peri.TriSeg.wLv.pLv0', 'Model.Peri.TriSeg.wSv.pSv0', 'Model.Peri.TriSeg.wRv.pRv0', ]: for i in range(3): for j in range(4): self.set(patch+f'.transmat{i}{j}', mean_dlogpar_dlogsens[i, j])
[docs] def get_matrix(self, verbose=False): # state = self.model_export() # self.run(stable=True) # reference_y = np.array([ # self['Patch2022']['SfEcmMax'], # self['Patch2022']['SfActMax'], # self['Patch2022']['SfPasAct'], # self['Patch2022']['LsPasAct'], # ]).T # n_patch = 5 # n_par = 3 # par = ['V_wall', 'AmRef', 'SfPas'] # model_parameters = np.ndarray((n_patch, n_par, 2)) # sensing_values = np.ndarray((n_patch, n_par, 2, 4)) # for i_patch in range(n_patch): # for i_par in range(n_par): # for i_sign, sign in enumerate([-1, 1]): # self.model_import(state) # self['Patch2022'][par[i_par]][i_patch] *= 1+0.05*sign # model_parameters[i_patch, i_par, i_sign] = self['Patch2022'][par[i_par]][i_patch] # self.run(stable=True) # sensing_values[i_patch, i_par, i_sign, :] = np.array([ # self['Patch2022']['SfEcmMax'][i_par], # self['Patch2022']['SfActMax'][i_par]/self['Patch2022']['SfAct'][i_par], # self['Patch2022']['SfPasAct'][i_par], # self['Patch2022']['LsPasAct'][i_par], # ]) # sens = reference_y.reshape(5, 1, 1, 4) / sensing_values # log_sens = np.log(sens) # # log_sens = np.log(1/sensing_values) # # diff_log_sens = np.diff(log_sens, axis=2)[:, :, 0, :] # diff_log_sens = log_sens[:, :, 1, :] - log_sens[:, :, 0, :] # log_parameters = np.log(model_parameters) # diff_parameters = log_parameters[:, :, 1] - log_parameters[:, :, 0] # diff_log_sens_Lv = diff_parameters[2].reshape((-1, 1)) / diff_log_sens[2, :, :] # mean_dlogpar_dlogsens = np.mean( # diff_parameters.reshape((*diff_parameters.shape, 1)) / diff_log_sens, # axis=0) # self.model_import(state) # return mean_dlogpar_dlogsens # def get_derivative(self): # if model is None: # model = VanOsta2023() model = self include_for_average = np.ones(5, dtype=bool) reference_parameters = get_parameters(model) reference_senses = get_reference_senses(model) parameters, senses = run_simulations(model, verbose=verbose) delta_logpar = np.log(parameters/reference_parameters) delta_logsens = np.log(senses/reference_senses) dlogpar_dlogsens = calculate_derivative(self, parameters, senses, include_for_average=include_for_average, verbose=verbose) if np.any(np.isnan(dlogpar_dlogsens)) and verbose: print('derivative is nan') return dlogpar_dlogsens.T
[docs] def get_reference_senses(model): options = model._adapt_options model['PFC']['q0'] = options['exercise']['q0'] model['General']['t_cycle'] = options['exercise']['t_cycle'] model.run(stable=True) return get_sens(model)
[docs] def get_parameters(model): return np.array([ model['Patch']['V_wall'], model['Patch']['Am_ref'], model['Patch']['Sf_pas'], ]).T
[docs] def get_sens(model): sens = np.ndarray((5,4)) Sf = model['Patch']['Sf'] SfPasT = model['Patch']['Sf_pas_total'] SfEcm = model['Patch']['Sf_ecm'] SfActT = Sf - SfPasT SfTit = SfPasT - SfEcm Ls = model['Patch']['l_s'] Lsact = model['Patch']['l_si0'] Ef = model['Patch']['Ef'] dEf = np.diff(Ef, axis=0, append=[Ef[0, :]]) sens[:, 0] = np.max(SfEcm, axis=0) sens[:, 1] = np.max(SfActT, axis=0) sens[:, 2] = np.max((SfActT + SfTit) * SfEcm / (SfActT + SfTit + SfEcm), axis=0) sens[:, 3] = np.sum((SfActT + SfTit) * SfEcm * Ls, axis=0) / np.sum( (SfActT + SfTit) * SfEcm, axis=0) return sens
rel_std_amref = 0.02 rel_std_vwall = 0.05 rel_std_sfpas = 0.05
[docs] def run_simulations(model, n_sims=1000, verbose=False): state = model.model_export() options = model._adapt_options parameters = [] senses = [] parameters_reference = get_parameters(model) while len(parameters) < n_sims: if verbose: print(f'\r Calculate simulations for derivative {len(parameters)}/{n_sims}', end='') # sys.stdout.flush() model['Patch']['V_wall'] = np.exp(np.log(parameters_reference[:, 0]) + np.random.normal(0, rel_std_vwall, size=5)) model['Patch']['Am_ref'] = np.exp(np.log(parameters_reference[:, 1]) + np.random.normal(0, rel_std_amref, size=5)) model['Patch']['Sf_pas'] = np.exp(np.log(parameters_reference[:, 2]) + np.random.normal(0, rel_std_sfpas, size=5)) model['PFC']['q0'] = options['exercise']['q0'] model['General']['t_cycle'] = options['exercise']['t_cycle'] try: model.run(stable=True) # model.run(50) sens = get_sens(model) if np.any(np.isnan(sens)): print(senses[-1]) continue parameters.append(get_parameters(model)) senses.append(sens) except ModelCrashed: model.model_import(state) # raise ValueError('Crashed') if verbose: print(f'\r Calculate simulations for derivative - Done') return np.array(parameters), np.array(senses)
[docs] def calculate_derivative(model, parameters, senses, include_for_average=None, verbose=False): if include_for_average is None: include_for_average = np.ones(5, dtype=bool) reference_parameters = get_parameters(model) rererence_senses = get_reference_senses(model) n_senses = rererence_senses.shape[1] delta_logpar = np.log(parameters/reference_parameters) delta_logsens = np.log(senses/rererence_senses) # dlogpar_dlogsens = np.ndarray((n_senses, 3)) # for i_par in range(3): # for i_sens in range(n_senses): # delta_logsens1 = delta_logsens[:, include_for_average, i_sens].reshape(-1) # delta_logpar1 = delta_logpar[:, include_for_average, i_par].reshape(-1) # include_idx = np.invert(np.isnan(delta_logsens1)) # slope, _, _, _, _ = stats.linregress( # delta_logsens1[include_idx], # delta_logpar1[include_idx], # ) # dlogpar_dlogsens[i_sens, i_par] = slope idx_include = np.invert(np.any(np.isnan(delta_logsens.reshape((delta_logsens.shape[0], -1))), axis=1)) dlogpar5 = delta_logpar.reshape((delta_logpar.shape[0], -1)) dlogsens5 = delta_logsens.reshape((delta_logsens.shape[0], -1)) M5 = np.dot( np.dot(dlogpar5.T, dlogsens5), np.linalg.inv(np.dot(dlogsens5.T, dlogsens5)), ) # off-diagonal is zero n_wall = 5 n_sens = 4 dlogpar_dlogsens = np.zeros((3, n_sens)) if np.any(np.isnan(M5)) and verbose: print('M5 is nan') for i_wall in range(5): M5[3*i_wall:(3*(i_wall+1)), ((1+i_wall)*n_sens):] = 0 M5[3*i_wall:(3*(i_wall+1)), :((i_wall)*n_sens)] = 0 dlogpar_dlogsens += 1/5 * M5[ 3*i_wall:(3*(i_wall+1)), ((i_wall)*n_sens):((1+i_wall)*n_sens) ] if np.any(np.isnan(dlogpar_dlogsens)) and verbose: print('M5 is nan') return dlogpar_dlogsens.T