"""esbmtk: A general purpose Earth Science box model toolkit.
Copyright (C), 2020 Ulrich G. Wortmann
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or (at
your option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from __future__ import annotations
import typing as tp
import numpy as np
import numpy.typing as npt
import logging
NDArrayFloat = npt.NDArray[np.float64]
if tp.TYPE_CHECKING:
from esbmtk import (
ExternalFunction,
Flux,
Model,
Species,
Species2Species,
)
[docs]
class SourceIsotopeError(Exception):
"""Custom Error Class."""
def __init__(self, message):
"""Initialize Error Instance with formatted message."""
message = f"\n\n{message}\n"
super().__init__(message)
[docs]
def build_eqs_matrix(M: Model) -> tuple[NDArrayFloat, NDArrayFloat]:
"""Build the coefficient matrix CM and Flux vector f.
So that we can solve dC_dt = C dot flux_values
"""
from esbmtk import GasReservoir, Species, Species2Species
fi = 0
F_names = []
for f in M.lof: # get flux index positions
F_names.append(f.full_name)
f.idx = fi # set starting index
fi = fi + 1
# get number of reservoirs
num_res = 0
# M.r_list = []
for r in M.lor:
# M.r_list.append(r.full_name)
num_res = num_res + 1
if r.isotopes:
# M.r_list.append(f"{r.full_name}_i")
num_res = num_res + 1
F = np.ones(fi) # initialize flux value vector
CM = np.zeros((num_res, fi), dtype=float) # Initialize Coefficient matrix:
"""We have n reservoirs, without isotopes, this corresponds to
n rows in the coefficinet matrix. However, if reservoirs have
isotopes, we need an additional row to calculate the respective
isotope concentration. Thus we have two counters, cr pointing to
the current reservoir, and ri pointing to the respective row index
in the coefficient matrix"""
cr = 0
ri = 0
while ri < num_res:
r = M.lor[cr] # get current reservoir
r.idx = ri # record index position
cr = cr + 1
if r.rtype == "computed":
# computed reservoirs are currently not set up by a Species2Species
# connection, so we need to add a flux expression manually.
if r.model.debug:
logging.info(f"bem1: {r.full_name} type = {r.rtype}")
CM[ri, r.lof[0].idx] = 1
if r.isotopes:
CM[ri + 1, r.lof[1].idx] = 1
else: # regular reservoirs may have multiple fluxes
if isinstance(r, Species):
density = (
r.parent.swc.density / 1000 if hasattr(r.parent, "swc") else 1.0
)
mass = r.volume.to(r.model.v_unit).magnitude * density
elif isinstance(r, GasReservoir):
mass = r.v[0]
else:
raise ValueError(f"no definition for {type(r)}")
""" Regular reservoirs, can have isotopes, If they do we need to treat them
like a new reservoir, so they require their own line in the coefficient
matrix. """
fi = 0
while fi < len(r.lof): # loop over reservoir fluxes
f = r.lof[fi]
if f in r.lif:
sign = 0 # we ignore those fluxes
else:
sign = -1 / mass if f.parent.source == r else 1 / mass
if r.isotopes: # add line for isotope calculations
CM[ri, r.lof[fi].idx] = sign
CM[ri + 1, r.lof[fi + 1].idx] = sign # 2
fi = fi + 2
else:
CM[ri, r.lof[fi].idx] = sign
fi = fi + 1
ri = ri + 1 # increase count by 1, or two if isotopes
if r.isotopes:
ri = ri + 1
return CM[:ri, :], F, F_names
[docs]
def write_equations_3(
M: Model,
R: list[float],
icl: dict,
cpl: list,
ipl: list,
eqs_fn: str,
) -> str:
"""Write file that contains the ode-equations for the Model.
:param Model: Model handle
:param R: tp.List of floats with the initial conditions for each
reservoir
:param icl: dict of reservoirs that have actual fluxes
:param cpl: tp.List of reservoirs that have no fluxes but are
computed based on other reservoirs
:param ipl: tp.List of reservoir that do not change in concentration
:param fn: str, filename
Returns: equationfile name
"""
# construct header and static code:
# add optional import statements
h1 = ""
h2 = '''# @njit(fastmath=True)
def eqs(t, R, M, gpt, toc, area_table, area_dz_table, Csat_table, CM, F):
"""Calculate dCdt for each reservoir.
t = time vector
R = initial conditions for each reservoir
M = model handle. This is currently required for signals
gpt = tuple of global constants (is this still used?)
toc = is a tuple of containing constants for each external function
area_table = lookuptable used by carbonate_system_2
area_dz_table = lookuptable used by carbonate_system_2
Csat_table = lookuptable used by carbonate_system_2
CM = Coefficient Matrix
F = flux vector
Returns: dCdt as numpy array
Reservoir name lookup: M.R_names[15]
Flux name lookup: M.F_names[2]
Matrix Coefficients: M.CM[reservoir index, flux index]
Table of constants: M.toc[idx]
"""
'''
ind2 = 4 * " " # indention
ind3 = 8 * " " # indention
h1 = '"""ESBMTk equationsfile."""'
hi = ""
# ensure there are no duplicates
M.lpc_i = set(M.lpc_i)
M.lpc_f = set(M.lpc_f)
if len(M.lpc_f) > 0:
hi += "from esbmtk import "
for f in M.lpc_f:
hi += f"{f}, "
hi = f"{hi[:-2]}\n" # strip comma and space
if len(M.lpc_i) > 0: # test if functions imports are required
hi += f"from esbmtk.bio_pump_functions{M.bio_pump_functions} import "
for f in M.lpc_i:
hi += f"{f}, "
hi = f"{hi[:-2]}\n" # strip comma and space
if M.luf: # test if user defined function imports are required
for function, args in M.luf.items():
source = args[0]
hi += f"from {source} import {function}\n"
header = f"{h1}\n\n{h2}" if hi == "" else f"{h1}\n{hi}\n\n{h2}"
rel = "" # list of return values
# write file
with open(eqs_fn, "w", encoding="utf-8") as eqs:
eqs.write(header)
sep = (
"# ---------------- write computed reservoir equations -------- #\n"
+ " # that do not depend on fluxes"
)
eqs.write(f"{sep}\n")
""" Computed reservoirs and computed fluxes are both stored in M.lpc_r
At this point, we only print the reservoirs.
TODO: This should really be cleaned up
"""
for r in M.lpc_r: # All virtual reservoirs need to be in this list
if r.ftype == "std":
rel = write_ef(eqs, r, icl, rel, ind2, ind3, M.gpt)
# write all fluxes in M.lof
flist = []
sep = " # ---------------- write all flux equations ------------------- #"
eqs.write(f"\n{sep}\n")
fi = 0
while fi < len(M.lof):
# for flux in M.lof: # loop over fluxes
"""This loop will only write regular flux equations, with the sole
exception of fluxes belonging to ExternalCode objects that need to be
in a given sequence" All other fluxes must be on the M.lpr_r list
below.
"""
# functions can return more than one flux, but we only need to
# call the function once, so we check against flist first
flux = M.lof[fi]
if flux not in flist:
flist.append(flux) # add to list of fluxes already computed
# include computed fluxes that need to be in sequence
if flux.ftype == "in_sequence":
rel = write_ef(eqs, r, icl, rel, ind2, ind3, M.gpt)
elif flux.ftype == "None": # get flux expressions
rhs, rhs_out, rhs_debug, prefix_code = get_flux(flux, M, R, icl)
# write regular equation
if rhs_debug[0] != "": # add debug if need be
eqs.write(f"{ind2}{rhs_debug[0]}")
if rhs_out[0]:
fn = f"F[{flux.idx}]"
eqs.write(f"{ind2}{fn} = {rhs[0]}\n")
# write isotope equations
if rhs_debug[1] != "": # add debug if need beg
eqs.write(f"{ind2}{rhs_debug[1]}")
if rhs_out[1]: # add line for isotopes
if prefix_code != "":
eqs.write(f"{ind2}{prefix_code}\n")
fn = f"F[{flux.idx + 1}]"
eqs.write(f"{ind2}{fn} = {rhs[1]}\n")
fi = fi + 1
# FIXME: This counter invcrement fails for chained scale with flux
# expressions.
fi = fi + 1
sep = (
" # ---------------- write computed reservoir equations -------- #\n"
+ "# that do depend on fluxes"
)
eqs.write(f"\n{sep}\n")
for r in M.lpc_r: # All virtual reservoirs need to be in this list
if r.ftype == "computed": #
rel = write_ef(eqs, r, icl, rel, ind2, ind3, M.gpt)
sep = " # ---------------- write input only reservoir equations -------- #"
eqs.write(f"\n{sep}\n")
for r in ipl:
rname = r.full_name.replace(".", "_")
eqs.write(f"{ind2}{rname} = 0.0")
sep = " # ---------------- calculate concentration change ------------ #"
eqs.write(f"\n{sep}\n")
eqs.write(f"{ind2}return CM.dot(F)\n")
return eqs_fn.stem
# ------------------------ define processes ------------------------- #
[docs]
def write_ef(
eqs,
ef: Species | ExternalFunction,
icl: dict,
rel: str,
ind2: str,
ind3: str,
gpt: tuple,
) -> str:
"""Write external function call code.
:param eqs: equation file handle
:param ef: external_function handle
:param icl: dict of reservoirs that have actual fluxes
:param rel: string with reservoir names returned by setup_ode
:param ind2: indent 2 times
:param ind3: indent 3 times
:param gpt: tuple with global paramaters
:returns: rel: modied string of reservoir names
"""
from esbmtk import Flux
""" get list of return values. This is a bit of hack, since we imply
anything that its not a flux, is a reservoir with no other inputs.
Although, legacy fluxes do not come with a separate flux for isotopes,
whereas newly created fluxes do.
"""
rv = ind2
for _i, o in enumerate(ef.lro):
if isinstance(o, Flux):
v = f"F[{o.idx}], "
elif isinstance(o, str):
if o != "None":
raise ValueError(f"type(o) = {type(o)}, o = {o}")
else:
raise ValueError(f"type(o) = {type(o)}, o = {o}")
rv += v
rv = rv[:-2] # strip the last comma
a = ""
# this can probably be simplified similar to the old parse_return_values()
for d in ef.function_input_data:
a += parse_esbmtk_input_data_types(d, ef, ind3, icl)
if ef.function_params == "None":
eqs.write(f"{rv} = {ef.fname}(\n{a}{ind2})\n\n")
else:
s = f"gpt[{ef.param_start}]"
eqs.write(f" # id: {ef.name}\n")
eqs.write(f"{rv} = {ef.fname}(\n{a}{ind3}{s},\n{ind2})\n\n")
rel += f"{ind3}{rv},\n"
return rel
[docs]
def get_flux(flux: Flux, M: Model, R: list[float], icl: dict) -> tuple[str, str, list]:
"""Create formula expressions that calculates the flux F.
Return the equation expression as string
:param flux: The flux object for which we create the equation
:param M: The current model object
:param R: The list of initial conditions for each reservoir
:param icl: dict of reservoirs that have actual fluxes
:returns: A tuple where the first string is the equation for the
total flux, and the second string is the equation for
the flux of the light isotope
"""
ind2 = 8 * " " # indentation
ind3 = 12 * " " # indentation
rhs = ["", ""]
rhs_debug = ["", ""]
rhs_out = [False, False]
c = flux.parent # shorthand for the connection object
cfn = flux.parent.full_name # shorthand for the connection object name
"""flux parent would typically be a connection object. However for computed
reservoirs, the flux is registed with a the computed reservoir.
"""
from esbmtk import Species, Species2Species
prefix_code = ""
if isinstance(c, Species2Species):
if c.ctype.casefold() == "regular" or c.ctype.casefold() == "fixed":
rhs, rhs_out, rhs_debug, prefix_code = get_regular_flux_eq(
flux, c, icl, ind2, ind3, M.CM, M.toc
)
elif c.ctype == "scale_with_concentration" or c.ctype == "scale_with_mass":
rhs, rhs_out, rhs_debug, prefix_code = get_scale_with_concentration_eq(
flux, c, cfn, icl, ind2, ind3, M.CM, M.toc
)
elif c.ctype == "scale_with_flux":
rhs, rhs_out, rhs_debug, prefix_code = get_scale_with_flux_eq(
flux, c, cfn, icl, ind2, ind3, M.CM, M.toc
)
elif c.ctype == "ignore" or c.ctype == "gasexchange" or c.ctype == "weathering":
pass
else:
raise ValueError(
f"Species2Species type {c.ctype} for {c.full_name} is not implemented"
)
elif isinstance(c, Species):
rhs = f"{0} # {c.full_name}" # The flux will simply be zero
else:
raise ValueError(f"No definition for \n{c.full_name} type = {type(c)}\n")
return rhs, rhs_out, rhs_debug, prefix_code
[docs]
def get_ic(r: Species, icl: dict, isotopes=False) -> str:
"""Get initial condition in a reservoir.
If the reservoir is icl,
return index expression into R.c. If the reservoir is not in the
index, return the Species concentration a t=0
In both cases return these a string
If isotopes == True, return the pointer to the light isotope
concentration
:param r: A reservoir handle
:param icl: icl = dict[Species, list[int, int]] where reservoir
indicates the reservoir handle, and the list contains the
index into the reservoir data. list[0] = concentration
list[1] concentration of the light isotope.
:raises ValueError: get_ic: can't find {r.full_name} in list of
initial conditions
:returns: the string s which is the full_name of the reservoir
concentration or isotope concentration
"""
from esbmtk import Sink, Source
if r in icl:
s1 = f"R[{icl[r][0]}]"
if isotopes:
s1 += f", R[{icl[r][1]}]"
elif isinstance(r, Source | Sink):
# FIXME: this should really point to a variable, maybe add to
# gpt?
s1 = f"{r.c}"
if isotopes:
# FIXME: this should really point to a variable
try:
s1 += f", {r.l}"
except SourceIsotopeError:
logging.critical(f"\n {r.full_name} needs to specify a delta value\n")
raise SourceIsotopeError(
f"\n {r.full_name} needs to specify a delta value\n"
) from None
else:
raise ValueError(
f"get_ic: can't find {r.full_name} in list of initial conditions"
)
return s1
[docs]
def get_regular_flux_eq(
flux: Flux, # flux instance
c: Species2Species, # connection instance
icl: dict, # list of initial conditions
ind2, # indentation
ind3, # indentation
CM: NDArrayFloat, # coefficient matrix
toc: tuple, # tuple of constants
) -> tuple[str, str, list]:
"""Create a string containing the equation for a regular connection.
Regular = fixed rate
:param flux: flux instance
:param c: connection object
:param icl: dict of reservoirs that have actual fluxes
:param ind2: indent 2 times
:param ind3: indent 3 times
:returns: two strings, where the first describes the equation for
the total flux, and the second describes the rate for
the light isotope
"""
"""FIXME: write 3 blocks
1) debug information: connection name, constants, RHS
2) constants
3) RHS
ditto for isotopes
"""
from esbmtk import Source
rhs, rhs_l = ["", ""]
debug_rhs = ["", ""]
rhs_out = [False, False]
prefix_code = ""
if flux.serves_as_input or c.signal != "None":
# Needs full expression with all constants, so that we can
# reference the expression elsewhere.
rhs = toc[c.r_index] * toc[c.s_index] # constant flux -> rhs = c
if flux.isotopes:
rhs_l, rhs_out[1], debug_rhs[1] = isotopes_regular_flux(
rhs, c, icl, ind3, ind2, CM, toc, flux
)
rhs, rhs_l, prefix_code = check_signal_2(
rhs, rhs_l, c, icl
) # check if we hav to add a signal
rhs_out[0] = True
else:
# get constants and place them on matrix
# FIXME: moving this to the matrix, creates problems in the
# isotope module. add second one?
# CM[:, flux.idx] = CM[:, flux.idx] * toc[c.r_index]
rhs = toc[c.r_index] * toc[c.s_index]
rhs_out[0] = True
if flux.isotopes:
rhs_l, rhs_out[1], debug_rhs[1] = isotopes_regular_flux(
rhs, c, icl, ind3, ind2, CM, toc, flux
)
if c.mo.debug_equations_file: # and output:
if isinstance(c.source, Source):
m1 = "constant flux from a source\n"
m2 = "\n"
else:
m1 = " constants = CM[c.source.idx, flux.idx] * toc[c.r_index]\n"
m2 = " constants = CM[{c.source.idx}:, {flux.idx}] * toc[{c.r_index}]\n"
debug_rhs[0] = (
f'"""\n'
f" {c.ctype}: {c.name}, id={c.id}\n"
f"{m1}"
f"{m2}"
f" rhs = None\n"
f' """\n'
)
return [rhs, rhs_l], rhs_out, debug_rhs, prefix_code
[docs]
def get_scale_with_concentration_eq(
flux: Flux, # flux instance
c: Species2Species, # connection instance
cfn: str, # full name of the connection instance
icl: dict, # list of initial conditions
ind2: str, # whitespace
ind3: str, # whitespace
CM: NDArrayFloat, # coefficient matrix
toc: tuple, # tuple of constants
) -> tuple[str, str, list]:
"""Create equation string for flux.
The flux will scale with the
concentration in the upstream reservoir
Example: M1_ConnGrp_D_b_to_L_b_TA_thc__F =
M1.ConnGrp_D_b_to_L_b.TA_thc.scale * R[5]
:param flux: Flux object
:param c: connection instance
:param cfn: full name of the connection instance
:param icl: dict[Species, list[int, int]] where reservoir
indicates the reservoir handle, and the list contains the
index into the reservoir data. list[0] = concentration
list[1] concentration of the light isotope.
:returns: two strings with the respective equations for the change
in the total reservoir concentration and the
concentration of the light isotope
"""
rhs, rhs_l = ["", ""]
debug_rhs = ["", ""]
rhs_out = [True, False] # we always have a right hand side
s_c = get_ic(c.ref_reservoirs, icl) # get index to concentration`
rhs = f"{s_c}" # source concentration
prefix_code = ""
if flux.serves_as_input or c.signal != "None":
# place all constants into the rhs so we can re-use the expression
rhs = f"toc[{c.s_index}] * {s_c}" # (scale * c)
if flux.isotopes:
rhs_l, rhs_out[1], debug_rhs[1] = isotopes_scale_concentration(
rhs, c, icl, ind3, ind2, CM, toc, flux
)
rhs, rhs_l, prefix_code = check_signal_2(rhs, rhs_l, c, icl)
else: # place all constants on matrix
# FIXME: moving this to the matrix, creates problems in the
# isotope module. Create a second version?
# CM[:, flux.idx] = CM[:, flux.idx] * toc[c.s_index]
# rhs = f"{s_c}" # flux changes with concentration in source
rhs = f"toc[{c.s_index}] * {s_c}" # (scale * c)
if flux.isotopes:
rhs_l, rhs_out[1], debug_rhs[1] = isotopes_scale_concentration(
rhs, c, icl, ind3, ind2, CM, toc, flux
)
if c.mo.debug_equations_file:
debug_rhs[0] = (
f'\n """\n'
f" {c.ctype}: {c.name}, id = {c.id}\n"
f" {flux.full_name} = CM[{c.source.idx}, {flux.idx}] * toc[{c.s_index}] * {c.ref_reservoirs.full_name}.\n"
f" {flux.full_name} = {CM[c.source.idx, flux.idx]:.2e} * {toc[c.s_index]:.2e} * {s_c}\n"
f' """\n'
)
return [rhs, rhs_l], rhs_out, debug_rhs, prefix_code
[docs]
def get_scale_with_flux_eq(
flux: Flux, # flux instance
c: Species2Species, # connection instance
cfn: str, # full name of the connection instance
icl: dict, # list of initial conditions
ind2: str, # indentation
ind3: str, # indentation
CM: NDArrayFloat, # coefficient matrix
toc: tuple, # tuple of constants
) -> tuple(str, str):
"""Equation defining a flux.
The flux will scale relative to the magnitude of another
flux. If isotopes are used, use the isotope ratio of the upstream reservoir.
:param flux: Flux object
:param c: connection instance
:param cfn: full name of the connection instance
:param icl: dict[Species, list[int, int]] where reservoir
indicates the reservoir handle, and the list contains the
index into the reservoir data. list[0] = concentration
list[1] concentration of the light isotope.
:returns: two strings with the respective equations for the change
in the total reservoir concentration and the
concentration of the light isotope
"""
from esbmtk import Sink, Source
rhs, rhs_l = ["", ""]
debug_rhs = ["", ""]
rhs_out = [True, False]
prefix_code = ""
if flux.serves_as_input or c.signal != "None":
"""The flux for the light isotope will computed as follows: We will use the mass
of the flux for scaling, but we isotope ratio of the source. Note the
scale_with_flux connection also supports an arbitrary scaling factor, which is
set with the __add_to_ode_constants__ methods in base_classes.py, and thus part
of the toc[i]
"""
rhs = f"toc[{c.s_index}] * F[{c.ref_flux.idx}]"
if c.source.isotopes and c.sink.isotopes:
if flux.isotopes:
rhs_l, rhs_out[1], debug_rhs[1] = isotopes_scale_flux(
rhs, c, icl, ind3, ind2, CM, toc, flux
)
else:
rhs, rhs_l, prefix_code = check_signal_2(rhs, rhs_l, c, icl)
else:
# FIXME: moving this to the matrix, creates problems in the
# isotope module
# CM[:, flux.idx] = CM[:, flux.idx] * toc[c.s_index]
rhs = f"F[{c.ref_flux.idx}] * toc[{c.s_index}]"
if flux.isotopes:
rhs_l, rhs_out[1], debug_rhs[1] = isotopes_scale_flux(
rhs, c, icl, ind3, ind2, CM, toc, flux
)
if c.mo.debug_equations_file:
if isinstance(c.source, Source | Sink):
debug_rhs[0] = (
f'\n """\n'
f" {c.ctype}: {c.name}, id = {c.id}\n"
f" {flux.full_name} = toc[{c.s_index}] * {c.ref_flux.full_name}\n"
f" {flux.full_name} = toc[{c.s_index:.2e}] * F[{c.ref_flux.idx}]\n"
f' """\n'
)
else:
debug_rhs[0] = (
f'\n """\n'
f" {c.ctype}: {c.name}, id = {c.id}\n"
f" {flux.full_name} = CM[{c.source.idx}, {flux.idx}] * toc[{c.s_index}] * {c.ref_flux.full_name}\n"
f" {flux.full_name} = {CM[c.source.idx, flux.idx]:.2e} * {toc[c.s_index]:.2e} * F[{c.ref_flux.idx}]\n"
f' """\n'
)
return [rhs, rhs_l], rhs_out, debug_rhs, prefix_code
[docs]
def isotopes_regular_flux(
f_m: str, # flux expression
c: Species2Species, # connection object
icl: dict, # initial conditions
ind3: str,
ind2: str,
CM: NDArrayFloat, # matrix
toc: NDArrayFloat, # table of constants
flux: Flux, #
) -> str:
"""Test if the connection involves any isotope effects.
:param f_m: string with the flux name
:param c: connection object
:param icl: dict of reservoirs that have actual fluxes
:param ind2: indent 2 times
:param ind3: indent 3 times
:returns equation string:
"""
"""Calculate the flux of the light isotope
Where f_m = flux mass/t, s_c = source concentration
s_l = source light isotope, a = fractionation factor alpha,
r = isotopic reference ratio
If delta is given: fl = f_m * 1000 / (r * (d + 1000) + 1000)
If epsilon is given: fl = f_m * s_l / (a * sc + sl - a * sl)
If neither: fl = f_m * s_l
If flux mass equals source concentration, this is a scale
with concentration connection, and f_l = s_l * scale where scale is
part of the constants on the equations matrix. Otherwise, the flux
isotope ratio is similar to the source isotope ratio:
fl = fm * sl/sc * scale. Note that scale is dropped in the below equations
since it is moved to the equations matrix.
FIXME: move constants in the matrix?
"""
r: float = c.source.species.r # isotope reference value
s: str = get_ic(c.source, icl, True) # R[0], R[1] reservoir concentrations
s_c, s_l = s.replace(" ", "").split(",") # total c, light isotope c
debug_str = ""
rhs_out = True
# light isotope flux with no effects
# CM[:, flux.idx + 1] = CM[:, flux.idx + 1] * toc[c.r_index]
if c.delta != "None":
equation_string = f"{f_m} * 1000 / ({r} * ({c.delta} + 1000) + 1000)"
ds1 = "{f_m} * 1000 / (r * (c.delta + 1000) + 1000)"
elif c.epsilon != "None":
a = c.epsilon / 1000 + 1 # convert to alpha notation
if f_m == "": # if f_m is not provided.
equation_string = f"{s_l} / ({a} * {s_c} + {s_l} - {a} * {s_l})"
ds1 = (
f"{c.source.full_name}.l"
f" / (a * {c.source.full_name}.c + {c.source.full_name}.l"
f" - a * {c.source.full_name}.l)"
)
else:
equation_string = f"{f_m} * {s_l} / ({a} * {s_c} + {s_l} - {a} * {s_l})"
ds1 = (
f"{c.source.full_name}.l * {flux.full_name}"
f" / (a * {c.source.full_name}.c + {c.source.full_name}.l"
f" - a * {c.source.full_name}.l)"
)
else:
equation_string = f"toc[{c.r_index}]"
ds1 = f"{c.source.full_name}.l * {flux.full_name}"
if c.mo.debug_equations_file:
debug_str = (
f'"""\n'
f" isotope equations for {c.full_name}:{c.ctype}\n"
f" constants = CM[c.source.idx:, flux.idx + 1] = CM[:, flux.idx + 1] * toc[c.r_index]\n"
f" constants = CM[c.source.idx:, {flux.idx + 1}] = CM[:, {flux.idx + 1}] * {toc[c.r_index]}\n"
f" rhs_l = {ds1}\n"
f" rhs_l = {equation_string}\n"
f' """\n'
)
return equation_string, rhs_out, debug_str
[docs]
def isotopes_scale_concentration(
f_m: str, # flux expression
c: Species2Species, # connection object
icl: dict, # initial conditions
ind3: str,
ind2: str,
CM: NDArrayFloat, # matrix
toc: NDArrayFloat, # table of constants
flux: Flux, #
) -> str:
"""Test if the connection involves any isotope effects.
:param f_m: string with the flux name
:param c: connection object
:param icl: dict of reservoirs that have actual fluxes
:param ind2: indent 2 times
:param ind3: indent 3 times
:returns equation string:
"""
"""Calculate the flux of the light isotope (f_l).
f_m = flux mass/t Note: this already has the scale factor!
s_c = source concentration
s_l = source concentration of light isotope
r_index references the rate keyword in a given connection instance
s_index references the scale keyword in a given connection instance
FIXME: unify rate & scale?
If delta is given: fl = f_m * 1000 / (r * (d + 1000) + 1000)
If epsilon is given: fl = f_m * sl * / (e * sc + sl - a * sl)
If neither: fl = f_m * s_l / s_c
FIXME: move constants on the matrix?
"""
r: float = c.source.species.r # isotope reference value
s: str = get_ic(c.source, icl, True) # R[0], R[1] reservoir concentrations
s_c, s_l = s.replace(" ", "").split(",") # total c, light isotope c
debug_str = ""
rhs_out = True
# light isotope flux with no effects
# CM[:, flux.idx + 1] = CM[:, flux.idx + 1] * toc[c.s_index]
if c.delta != "None":
equation_string = f"{f_m} * 1000 / ({r} * ({c.delta} + 1000) + 1000)"
ds1 = f"{c.source.full_name}.c * {toc[c.s_index]} * 1000 / (r * (c.delta + 1000) + 1000)"
elif c.epsilon != "None":
a = c.epsilon / 1000 + 1 # convert to alpha notation
equation_string = f"{f_m} * {s_l} / ({a} * {s_c} + {s_l} - {a} * {s_l})"
ds1 = (
f"toc[c.s_index] * {c.source.full_name}.l * {flux.full_name}.c"
f"/ (a * {c.source.full_name}.c + {c.source.full_name}.l"
f"- a * {c.source.full_name}.l)"
)
else:
if f_m == s_c:
equation_string = f"{s_l} * toc[{c.s_index}]"
ds1 = f"{c.source.full_name}.c"
else:
# equation_string = f"{f_m} * {s_l} / {s_c}"
equation_string = f"{s_l} * toc[{c.s_index}]"
ds1 = f"{flux.full_name} * toc[{c.s_index}] * {c.source.full_name}.l / {c.source.full_name}.c"
if c.mo.debug_equations_file:
debug_str = (
f'"""\n'
f" isotope equations for {c.full_name}:{c.ctype}\n"
f" Flux = {flux.full_name}, d = {c.delta}, e = {c.epsilon}\n"
f" constants = CM[s.source.idx:, flux.idx + 1] = CM[:, flux.idx + 1] * toc[c.s_index]\n"
f" constants = CM[{c.source.idx + 1}, {flux.idx + 1}] ="
f" {CM[c.source.idx + 1, flux.idx + 1]:.2e} * {toc[c.s_index]:.2e}\n"
f" rhs_l = {ds1}\n"
f" rhs_l = {equation_string}\n"
f' """\n'
)
return equation_string, rhs_out, debug_str
[docs]
def isotopes_scale_flux(
f_m: str, # flux expression including scale
c: Species2Species, # connection object
icl: dict, # initial conditions
ind3: str,
ind2: str,
CM: NDArrayFloat, # matrix
toc: NDArrayFloat, # table of constants
flux: Flux, #
) -> str:
"""Test if the connection involves any isotope effects.
:param f_m: string with the flux name
:param c: connection object
:param icl: dict of reservoirs that have actual fluxes
:param ind2: indent 2 times
:param ind3: indent 3 times
:returns equation string:
"""
"""Calculate the flux of the light isotope (f_l).
If delta is given: fl = fm * 1000 / (r * (d + 1000) + 1000)
If epsilon is given: fl = sl * fm / (e * sc + sl - a * sl)
If neither: If flux mass equals source concentration, this is a scale
with concentration connection, and f_l = s_l * scale where scale is
part of the constants on the equations matrix. Otherwise, the flux
isotope ratio is similar to the source isotope ratio:
fl = fm * sl/sc * scale. Note that scale is dropped in the below equations
since it is moved to the equations matrix.
Where fl = flux light isotope, fm = flux mass/t, s_c = source concentration
s_l = source concentration of light isotope
r_index references the rate keyword in a given connection instance
s_index references the scale keyword in a given connection instance
FIXME: unify rate & scale?
"""
from esbmtk import Source
r: float = c.source.species.r # isotope reference value
s: str = get_ic(c.source, icl, True) # R[0], R[1] reservoir concentrations
s_c, s_l = s.replace(" ", "").split(",") # total c, light isotope c
debug_str = ""
rhs_out = True
debug_expression = ""
scale = f"toc[{c.s_index}]"
f_m = f"F[{c.ref_flux.idx}]" # The ref flux is already scaled
if c.delta != "None":
# equation_string = f"{s_c} * 1000 / ({r} * ({c.delta} + 1000) + 1000)"
equation_string = f"F[{flux.idx}] * 1000 / ({r} * ({c.delta} + 1000) + 1000)"
ds1 = f"{scale} * {flux.full_name}.c * 1000 / (r * (c.delta + 1000) + 1000)"
elif c.epsilon != "None":
a = c.epsilon / 1000 + 1 # convert to alpha notation
# note f_m is already scaled!
equation_string = (
f"{scale} * {f_m} * {s_l} / ({a} * {s_c} + {s_l} - {a} * {s_l})"
)
ds1 = (
f"{scale} * {c.source.full_name}.l * {flux.full_name}.c"
f"/ (a * {c.source.full_name}.c + {c.source.full_name}.l"
f"- a * {c.source.full_name}.l)"
)
else:
if f_m == s_c:
equation_string = f"{s_l}"
ds1 = f"{c.source.full_name}.c"
debug_expression = "Flux from Reservoir = [flux.idx + 1] * c \n"
elif isinstance(c.source, Source):
# source isotope ratios are fixed, so we can place it on the coefficient matrix
CM[:, flux.idx + 1] = CM[:, flux.idx + 1] * scale * c.source.isotope_ratio
equation_string = f"{f_m}"
ds1 = f"scale * {flux.full_name} * {c.source.isotope_ratio}"
debug_expression = (
f"Flux from Source = [flux.idx + 1] * scale * c.source.isotope_ratio\n"
f"Flux from Source = {[flux.idx + 1]} * {scale} * {c.source.isotope_ratio}\n"
)
else:
equation_string = f"{scale} * {f_m} * {s_l} / {s_c}"
ds1 = f"{scale} * {flux.full_name} * {c.source.full_name}.l / {c.source.full_name}.c"
if c.mo.debug_equations_file:
# FIXME: this should reflect the above expressions using the same pattern as in
# scale with concentration.
debug_str = (
f'"""\n'
f" isotope equations for {c.full_name}:{c.ctype}\n"
f" Flux = {flux.full_name}, d = {c.delta}, e = {c.epsilon}\n"
f" {debug_expression}"
f" rhs_l = {ds1}\n"
f" rhs_l = {equation_string}\n"
f' """\n'
)
return equation_string, rhs_out, debug_str
[docs]
def check_signal_2(rhs: str, rhs_l: str, c: Species2Species, icl: dict) -> (str, str):
"""Test if connection is affected by a signal.
:param ex: equation string
:param c: connection object
:param icl: dict of reservoirs that have actual fluxes
:returns: (modified) equation string
"""
prefix_code = ""
if c.signal != "None": # get signal type
operators = { # Map signal types to their operators
"addition": "+",
"multiplication": "*",
}
# Get the operator for this signal type (if applicable)
sign = operators.get(c.signal.stype, "")
# Apply the signal based on its type
if c.signal.stype in ("addition", "multiplication"):
rhs = f"{rhs} {sign} {c.signal.full_name}(t)[0] # Signal"
if rhs_l != "": # isotopes are always additive
rhs_l = f"{rhs_l} + {c.signal.full_name}(t)[1] # Signal"
elif c.signal.stype == "epsilon_only":
"""Here we override any previous isotope effects and calculate
the flux of the light isotope as governed by the epsilon values
as read from the signal data"""
s: str = get_ic(c.source, icl, True) # R[0], R[1] reservoir concentrations
prefix_code = f"a = {c.signal.full_name}(t)[2] / 1000 + 1 # S to a"
s_c, s_l = s.replace(" ", "").split(",") # total c, light isotope c
rhs_l = f"{rhs} * {s_l} / (a * {s_c} + {s_l} - a * {s_l})"
return rhs, rhs_l, prefix_code
[docs]
def get_initial_conditions(
M: Model,
rtol: float,
atol_d: float = 1e-7,
) -> tuple[dict[str, float], dict, list, list, NDArrayFloat]:
"""Get list of initial conditions.
This list needs to match the number of equations.
:param Model: The model handle
:param rtol: relative tolerance for BDF solver.
:param atol_d: default value for atol if c = 0
:return: lic = dict of initial conditions
:return: icl = dict[Species, list[int, int]] where reservoir
indicates the reservoir handle, and the list contains the
index into the reservoir data. list[0] = concentration
list[1] concentration of the light isotope.
:return: cpl = list of reservoirs that use function to evaluate
reservoir data
:return: ipl = list of static reservoirs that serve as input
:return: rtol = array of tolerence values for ode solver
We need to consider 3 types of reservoirs:
1) Species that change as a result of physical fluxes i.e.
r.lof > 0. These require a flux statements and a reservoir
equation.
2) Species that do not have active fluxes but are computed
as a tracer, i.e.. HCO3. These only require a reservoir
equation
3) Species that do not change but are used as input. Those
should not happen in a well formed model, but we cannot
exclude the possibility. In this case, there is no flux
equation, and we state that dR/dt = 0
get_ic() will look up the index position of the
reservoir_handle on icl, and then use this index to retrieve
the correspinding value in R
Isotopes are handled by adding a second entry
"""
import numpy as np
lic = {} # list of initial conditions
atol: list = [] # list of tolerances for ode solver
# dict that contains the reservoir_handle as key and the index positions
# for c and l as a list
icl: dict[Species, list[int, int]] = {}
cpl: list = [] # list of reservoirs that are computed
ipl: list = [] # list of static reservoirs that serve as input
# collect all reservoirs that have initial conditions
# if r.rtype != "flux_only":
i: int = 0
for r in M.lic:
if len(r.lof) >= 0 or r.rtype == "computed" or r.rtype == "passive":
lic[r.full_name] = r.c[0]
tol = rtol * abs(r.c[0]) / 10 if r.c[0] > 0 else atol_d
atol.append(tol)
r.atol[0] = tol
if r.isotopes:
tol = rtol * abs(r.l[0]) / 10 if r.l[0] > 0 else atol_d
atol.append(tol)
r.atol[1] = tol
lic[f"{r.full_name}_l"] = r.l[0]
icl[r] = [i, i + 1]
i += 2
else:
icl[r] = [i, i]
i += 1
# if r.name == "O2_At":
if M.debug_equations_file:
logging.info(
f"r = {r.full_name}, r.c[0] = {r.c[0]:.2e}, rtol = {tol:.2e}"
)
return lic, icl, cpl, ipl, np.array(atol)