import numpy as np
import copy
from circadapt.error import ModelNotStable, ModelCrashed
[docs]
class ModelAdapt():
"""Model class to be inherited to give model adaptation protocol."""
def __init__(self):
self._wall_loc = [
'Model.Peri.La.wLa.',
'Model.Peri.Ra.wRa.',
]
self._triseg_loc = [
'Model.Peri.TriSeg.',
]
self._patch_loc = [
'Model.Peri.La.wLa.pLa0.',
'Model.Peri.Ra.wRa.pRa0.',
'Model.Peri.TriSeg.wLv.pLv0.',
'Model.Peri.TriSeg.wSv.pSv0.',
'Model.Peri.TriSeg.wRv.pRv0.',
]
self._all_senses = ['SfEcmMax',
'SfActMax',
'SfPasAct',
'LsPasAct',
]
self._all_senses_norm = ['SfPasMaxT',
'FacSfActT',
'SfPasActT',
'LsPasActT',
]
self._all_actors = ['V_wall',
'Am_ref',
'Sf_pas',
]
self._artven_cavities = [
'Model.SyArt.',
'Model.SyVen.',
'Model.PuArt.',
'Model.PuVen.',
]
self._artven_names = [
'SyArt',
'SyVen',
'PuArt',
'PuVen',
]
self._artven_actors = [
'A_wall',
'A0',
'p0',
]
self._valve_names = [
'Model.Peri.SyVenRa.',
'Model.Peri.RaRv.',
'Model.Peri.RvPuArt.',
'Model.Peri.PuVenLa.',
'Model.Peri.LaLv.',
'Model.Peri.LvSyArt.',
]
self._valve_actors = [
'A_open',
]
self._bags = ['Model.Peri.']
self._adapt_options = {
'rest': {
't_cycle': 0.85,
'q0': 8.5e-5,
'ventricles': {
},
'General': {},
},
'exercise': {
't_cycle': 0.85 / 2,
'q0': 8.5e-5 * 3,
'ventricles': {
},
'General': {},
},
'protocol': {
'n_beats': 50,
'n_cycles': 3,
'adapt_Wall': True,
'adapt_Bag': True,
'adapt_ArtVen': True,
'adapt_Valve': True,
'adapt_Valve_l': True,
'adapt_TriSeg': True,
},
}
_adapt_patch_object = 'Patch2022'
[docs]
def get_adapt_options(self):
"""Get default adapt options."""
options = copy.deepcopy(self._adapt_options)
return options
def _update_options(self, _options):
# first update options
options = self.get_adapt_options()
options.update(_options)
# now check for exercise ventricular parameters
# set key in rest if not exist to current value assuming it is in rest
for key in options['exercise']['ventricles']:
if key not in options['rest']['ventricles']:
options['rest']['ventricles'][key] = \
self[self._adapt_patch_object][key][2:]
for key in options['exercise']['General']:
if key not in options['rest']['General']:
options['rest']['General'][key] = \
self['General'][key]
return options
[docs]
def adapt(self,
options: dict = {},
output: bool = False,
verbose: bool = False,
) -> None:
"""
Run adaptation protocol.
The adaptation protocol runs n_cycles cycles. Each cycle has two
phases, namely rest adaptation and exercise adaptation. First, in
exercise, the Patches and vessel wall volumes are adapted to load. In
rest, vessels are adapted to flow.
Parameters
----------
options : dictionary, optional
Options for the protocol. To set options, use the function
get_adapt_options(), which is used by default. The default is {}.
output : bool, optional
DESCRIPTION. The default is False.
Returns
-------
senses_results : TYPE
Only when output=True.
senses_norm : TYPE
Only when output=True.
actor_results : TYPE
Only when output=True.
actor_vwall : TYPE
Only when output=True.
"""
if verbose:
print('Start Adaptation protocol')
options = self._update_options(options)
# get options
n_beats = options['protocol']['n_beats']
n_cycles = options['protocol']['n_cycles']
# allocate
if output:
(senses_results, senses_norm,
actor_results, actor_vwall, actor_valve) = \
self._allocate_output(n_beats * n_cycles * 2)
# run adaptation
if verbose:
print('Run Beats')
for i in range(n_beats * n_cycles * 2):
if verbose:
print('\rBeat '+str(i)+'/'+str(n_beats * n_cycles * 2), end="")
if (i // n_beats) % 2 == 0:
self._run_simulation(options['exercise'])
if self.get('Model.PFC.total_flow_error') < 0.05:
self.adapt_exercise(options)
else:
self.run(stable=True)
else:
self._run_simulation(options['rest'])
if self.get('Model.PFC.total_flow_error') < 0.05:
self.adapt_rest(options)
else:
self.run(stable=True)
if output:
self._adapt_store_data(
i,
senses_results,
senses_norm,
actor_results,
actor_vwall,
actor_valve,
)
# Bring back to rest
self._run_simulation(options['rest'])
if output:
return (senses_results, senses_norm,
actor_results, actor_vwall, actor_valve)
def _allocate_output(self, nBeats):
senses_results = np.ndarray((nBeats,
len(self._patch_loc),
len(self._all_senses)))
senses_norm = np.ndarray((nBeats,
len(self._patch_loc),
len(self._all_senses)))
actor_results = np.ndarray((nBeats,
len(self._patch_loc),
len(self._all_actors)))
actor_vwall = np.ndarray((nBeats,
len(self._artven_names),
len(self._artven_actors)))
actor_valve = np.ndarray((nBeats,
len(self._valve_names),
len(self._valve_actors)))
return (senses_results, senses_norm,
actor_results, actor_vwall, actor_valve)
[docs]
def adapt_exercise(self, options):
"""Trigger all excercise adaptation functions."""
if options['protocol']['adapt_Wall']:
for wall in self._wall_loc:
self.trigger(wall+'adapt_exercise')
if options['protocol']['adapt_TriSeg']:
for triseg in self._triseg_loc:
self.trigger(triseg+'adapt_exercise')
if options['protocol']['adapt_ArtVen']:
for avc in self._artven_cavities:
self.trigger(avc+'adapt_exercise')
if options['protocol']['adapt_Bag']:
for bag in self._bags:
self.trigger(bag+'adapt_exercise')
[docs]
def adapt_rest(self, options):
"""Trigger all rest adaptation functions."""
if options['protocol']['adapt_ArtVen']:
for avc in self._artven_cavities:
self.trigger(avc+'adapt_rest')
if options['protocol']['adapt_Valve']:
for valve in self._valve_names:
self.trigger(valve+'adapt_rest')
# TODO: move to c++
if options['protocol']['adapt_Valve_l']:
A_open = self['Valve']['A_open']
self['Valve']['l'] = np.sqrt(A_open / np.pi)
def _run_simulation(self, sub_options):
self._set_parameters(sub_options)
# self.run(stable=True)
self.run(1)
# stability essential for adaptation
# if not self.get('Model.is_stable'):
# raise ModelNotStable()
def _set_parameters(self, sub_options):
run_sims = self.get('Model.t_cycle') != sub_options['t_cycle']
run_sims = run_sims | (self.get('Model.PFC.q0') != sub_options['q0'])
if run_sims:
t_cycle = self.get('Model.t_cycle')
q0 = self.get('Model.PFC.q0')
self.set('Model.t_cycle', 0.5*(sub_options['t_cycle'] + t_cycle))
self.set('Model.PFC.q0', 0.5*(sub_options['q0'] + q0))
self.run(5)
self.set('Model.t_cycle', sub_options['t_cycle'])
self.set('Model.PFC.q0', sub_options['q0'])
# ventricles
for key, value in sub_options['ventricles'].items():
self[self._adapt_patch_object][key][2:] = value
for key, value in sub_options['General'].items():
self['General'][key] = value
if run_sims:
self.run(stable=True)
def _adapt_store_data(
self,
i,
senses_results,
senses_norm,
actor_results,
actor_vwall,
actor_valve,
):
# patch
for i_wall, wall in enumerate(self._patch_loc):
for i_sens, sens in enumerate(self._all_senses):
senses_results[i, i_wall, i_sens] = self.get(wall+sens,
dtype=float)
s = wall+self._all_senses_norm[i_sens]
senses_norm[i, i_wall, i_sens] = self.get(s, dtype=float)
if self._all_senses_norm[i_sens] == 'FacSfActT':
senses_norm[i, i_wall, i_sens] *= self.get(wall+'Sf_act',
dtype=float)
for i_act, act in enumerate(self._all_actors):
actor_results[i, i_wall, i_act] = self.get(wall+act,
dtype=float)
# artven
for i_wall, loc in enumerate(self._artven_cavities):
for i_act, act in enumerate(self._artven_actors):
actor_vwall[i, i_wall, i_act] = self.get(loc+act, dtype=float)
# valve
for i_valve, loc in enumerate(self._valve_names):
for i_act, act in enumerate(self._valve_actors):
actor_valve[i, i_valve, i_act] = self.get(loc+act, dtype=float)
[docs]
def calculate_and_set_matrix(self, verbose=False):
mean_dlogpar_dlogsens = self.get_matrix(verbose)
for patch in [
'Model.Peri.La.wLa.pLa0',
'Model.Peri.Ra.wRa.pRa0',
'Model.Peri.TriSeg.wLv.pLv0',
'Model.Peri.TriSeg.wSv.pSv0',
'Model.Peri.TriSeg.wRv.pRv0',
]:
for i in range(3):
for j in range(4):
self.set(patch+f'.transmat{i}{j}',
mean_dlogpar_dlogsens[i, j])
[docs]
def get_matrix(self, verbose=False):
# state = self.model_export()
# self.run(stable=True)
# reference_y = np.array([
# self['Patch2022']['SfEcmMax'],
# self['Patch2022']['SfActMax'],
# self['Patch2022']['SfPasAct'],
# self['Patch2022']['LsPasAct'],
# ]).T
# n_patch = 5
# n_par = 3
# par = ['V_wall', 'AmRef', 'SfPas']
# model_parameters = np.ndarray((n_patch, n_par, 2))
# sensing_values = np.ndarray((n_patch, n_par, 2, 4))
# for i_patch in range(n_patch):
# for i_par in range(n_par):
# for i_sign, sign in enumerate([-1, 1]):
# self.model_import(state)
# self['Patch2022'][par[i_par]][i_patch] *= 1+0.05*sign
# model_parameters[i_patch, i_par, i_sign] = self['Patch2022'][par[i_par]][i_patch]
# self.run(stable=True)
# sensing_values[i_patch, i_par, i_sign, :] = np.array([
# self['Patch2022']['SfEcmMax'][i_par],
# self['Patch2022']['SfActMax'][i_par]/self['Patch2022']['SfAct'][i_par],
# self['Patch2022']['SfPasAct'][i_par],
# self['Patch2022']['LsPasAct'][i_par],
# ])
# sens = reference_y.reshape(5, 1, 1, 4) / sensing_values
# log_sens = np.log(sens)
# # log_sens = np.log(1/sensing_values)
# # diff_log_sens = np.diff(log_sens, axis=2)[:, :, 0, :]
# diff_log_sens = log_sens[:, :, 1, :] - log_sens[:, :, 0, :]
# log_parameters = np.log(model_parameters)
# diff_parameters = log_parameters[:, :, 1] - log_parameters[:, :, 0]
# diff_log_sens_Lv = diff_parameters[2].reshape((-1, 1)) / diff_log_sens[2, :, :]
# mean_dlogpar_dlogsens = np.mean(
# diff_parameters.reshape((*diff_parameters.shape, 1)) / diff_log_sens,
# axis=0)
# self.model_import(state)
# return mean_dlogpar_dlogsens
# def get_derivative(self):
# if model is None:
# model = VanOsta2023()
model = self
include_for_average = np.ones(5, dtype=bool)
reference_parameters = get_parameters(model)
reference_senses = get_reference_senses(model)
parameters, senses = run_simulations(model, verbose=verbose)
delta_logpar = np.log(parameters/reference_parameters)
delta_logsens = np.log(senses/reference_senses)
dlogpar_dlogsens = calculate_derivative(self, parameters, senses, include_for_average=include_for_average, verbose=verbose)
if np.any(np.isnan(dlogpar_dlogsens)) and verbose:
print('derivative is nan')
return dlogpar_dlogsens.T
[docs]
def get_reference_senses(model):
options = model._adapt_options
model['PFC']['q0'] = options['exercise']['q0']
model['General']['t_cycle'] = options['exercise']['t_cycle']
model.run(stable=True)
return get_sens(model)
[docs]
def get_parameters(model):
return np.array([
model['Patch']['V_wall'],
model['Patch']['Am_ref'],
model['Patch']['Sf_pas'],
]).T
[docs]
def get_sens(model):
sens = np.ndarray((5,4))
Sf = model['Patch']['Sf']
SfPasT = model['Patch']['Sf_pas_total']
SfEcm = model['Patch']['Sf_ecm']
SfActT = Sf - SfPasT
SfTit = SfPasT - SfEcm
Ls = model['Patch']['l_s']
Lsact = model['Patch']['l_si0']
Ef = model['Patch']['Ef']
dEf = np.diff(Ef, axis=0, append=[Ef[0, :]])
sens[:, 0] = np.max(SfEcm, axis=0)
sens[:, 1] = np.max(SfActT, axis=0)
sens[:, 2] = np.max((SfActT + SfTit) * SfEcm / (SfActT + SfTit + SfEcm), axis=0)
sens[:, 3] = np.sum((SfActT + SfTit) * SfEcm * Ls, axis=0) / np.sum( (SfActT + SfTit) * SfEcm, axis=0)
return sens
rel_std_amref = 0.02
rel_std_vwall = 0.05
rel_std_sfpas = 0.05
[docs]
def run_simulations(model, n_sims=1000, verbose=False):
state = model.model_export()
options = model._adapt_options
parameters = []
senses = []
parameters_reference = get_parameters(model)
while len(parameters) < n_sims:
if verbose:
print(f'\r Calculate simulations for derivative {len(parameters)}/{n_sims}', end='')
# sys.stdout.flush()
model['Patch']['V_wall'] = np.exp(np.log(parameters_reference[:, 0]) + np.random.normal(0, rel_std_vwall, size=5))
model['Patch']['Am_ref'] = np.exp(np.log(parameters_reference[:, 1]) + np.random.normal(0, rel_std_amref, size=5))
model['Patch']['Sf_pas'] = np.exp(np.log(parameters_reference[:, 2]) + np.random.normal(0, rel_std_sfpas, size=5))
model['PFC']['q0'] = options['exercise']['q0']
model['General']['t_cycle'] = options['exercise']['t_cycle']
try:
model.run(stable=True)
# model.run(50)
sens = get_sens(model)
if np.any(np.isnan(sens)):
print(senses[-1])
continue
parameters.append(get_parameters(model))
senses.append(sens)
except ModelCrashed:
model.model_import(state)
# raise ValueError('Crashed')
if verbose:
print(f'\r Calculate simulations for derivative - Done')
return np.array(parameters), np.array(senses)
[docs]
def calculate_derivative(model, parameters, senses, include_for_average=None, verbose=False):
if include_for_average is None:
include_for_average = np.ones(5, dtype=bool)
reference_parameters = get_parameters(model)
rererence_senses = get_reference_senses(model)
n_senses = rererence_senses.shape[1]
delta_logpar = np.log(parameters/reference_parameters)
delta_logsens = np.log(senses/rererence_senses)
# dlogpar_dlogsens = np.ndarray((n_senses, 3))
# for i_par in range(3):
# for i_sens in range(n_senses):
# delta_logsens1 = delta_logsens[:, include_for_average, i_sens].reshape(-1)
# delta_logpar1 = delta_logpar[:, include_for_average, i_par].reshape(-1)
# include_idx = np.invert(np.isnan(delta_logsens1))
# slope, _, _, _, _ = stats.linregress(
# delta_logsens1[include_idx],
# delta_logpar1[include_idx],
# )
# dlogpar_dlogsens[i_sens, i_par] = slope
idx_include = np.invert(np.any(np.isnan(delta_logsens.reshape((delta_logsens.shape[0], -1))), axis=1))
dlogpar5 = delta_logpar.reshape((delta_logpar.shape[0], -1))
dlogsens5 = delta_logsens.reshape((delta_logsens.shape[0], -1))
M5 = np.dot(
np.dot(dlogpar5.T, dlogsens5),
np.linalg.inv(np.dot(dlogsens5.T, dlogsens5)),
)
# off-diagonal is zero
n_wall = 5
n_sens = 4
dlogpar_dlogsens = np.zeros((3, n_sens))
if np.any(np.isnan(M5)) and verbose:
print('M5 is nan')
for i_wall in range(5):
M5[3*i_wall:(3*(i_wall+1)), ((1+i_wall)*n_sens):] = 0
M5[3*i_wall:(3*(i_wall+1)), :((i_wall)*n_sens)] = 0
dlogpar_dlogsens += 1/5 * M5[
3*i_wall:(3*(i_wall+1)),
((i_wall)*n_sens):((1+i_wall)*n_sens)
]
if np.any(np.isnan(dlogpar_dlogsens)) and verbose:
print('M5 is nan')
return dlogpar_dlogsens.T