Source code for esbmtk.utility_functions

"""esbmtk: A general purpose Earth Science box model toolkit.

Copyright (C), 2020 Ulrich G. Wortmann

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""

from __future__ import annotations

import functools
import logging
import typing as tp

import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt

# from numba import njit

if tp.TYPE_CHECKING:
    from esbmtk import ExternalFunction, Model, Species

np.set_printoptions(precision=4)
# declare numpy types
NDArrayFloat = npt.NDArray[np.float64]


[docs] class ScaleError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Class.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] def rmtree(f) -> None: """Delete file or files if file is directory. :param f: pathlib path object """ if f.is_file(): f.unlink() else: for child in f.iterdir(): rmtree(child) f.rmdir()
[docs] def phc(c: float) -> float: """Calculate concentration as pH. c can be a number or numpy array Parameters ---------- c : float H+ concentration Returns ------- float pH value """ import numpy as np pH: float = -np.log10(c) return pH
[docs] def debug(func): """Print the function signature and return value.""" @functools.wraps(func) def wrapper_debug(*args, **kwargs): args_repr = [f"{float(repr(a)):.2e}\n" for a in args] # 1 kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] # 2 signature = ", ".join(args_repr + kwargs_repr) # 3 print(f"Calling {func.__name__}\n({signature})") value = func(*args, **kwargs) value = [f"{v:.2e}" for v in value] print(f"\n{func.__name__!r} returned\n {value!r}\n") # 4 return value return wrapper_debug
[docs] def get_reservoir_reference(k: str, M: Model) -> tuple: """Get SpeciesProperties and Species handles. Parameters ---------- k : str with the initial flux name, e.g., M_F_A_db_DIC M : Model Model handle Returns ------- tuple Species2Species, SpeciesProperties Raises ------ ValueError If reservoir_name is not of type ConnectionProperties or Species2Species """ from esbmtk import ( GasReservoir, Reservoir, Species, Species2Species, SpeciesProperties, ) key_list = k[2:].split(".") # get model, reservoir & species name if len(key_list) == 3: # Reservoir _model_name, reservoir_name, species_name = key_list if hasattr(M, reservoir_name): obj = getattr(M, reservoir_name) else: raise ValueError(f"{reservoir_name} is not in the model") if k[0:2] == "R_": reservoir = obj elif k[0:2] == "F_": if isinstance(obj, Species | Reservoir): reservoir = obj elif isinstance(obj, Species2Species): reservoir = obj if isinstance(reservoir.source, GasReservoir): species_name = obj.sink.name else: species_name = obj.source.name else: raise ValueError( f"{obj.name} must be Species2Species", f"not {type(obj)}" ) elif len(key_list) == 2: # (Gas)Species _model_name, reservoir_name = key_list reservoir = getattr(M, reservoir_name) else: raise ValueError("kl should look like this F_M.CO2_At") species = getattr(M, species_name) if not isinstance(species, SpeciesProperties): raise NotImplementedError() return reservoir, species
[docs] def register_new_flux(ec, model_object, dict_key, dict_value) -> list: """Register a new flux object with a Species2Species instance. Parameters ---------- ec : ExternalCode object model_object : Species | Reservoir instance to register with dict_key : str E.g., "M.A_db.DIC" dict_value : str id value, e.g., "db_cs2" Returns ------- list list of Flux instances """ from .base_classes import Flux, Species from .extended_classes import Reservoir if isinstance(model_object, Reservoir): spn = dict_key.split(".")[-1] species_properties = getattr(model_object.mo, spn) species = getattr( model_object, species_properties.name ) # species to register flux with elif isinstance(model_object, Species): # in this case rg contains a species not a reservoirgroup species_properties = model_object.species rg = model_object.register # get associated reservoir species = getattr(rg, species_properties.name) else: logging.debug(f" type(rg) = {type(model_object)}") raise NotImplementedError if not hasattr(species, "source"): species.source = species_properties.name ro = list() # this also registers the flux with M.lof num = ["", "_l"] if species.isotopes else [""] for e in num: logging.debug(f"ro= {model_object.full_name}, dv = {dict_value}, num = {num}") f = Flux( name=f"{dict_value}{e}", species=species_properties, rate=0, register=species, ftype=ec.ftype, isotopes=species.isotopes, id=f"{species.name}{e}", ) ro.append(f) ec.lof.append(f) species.lof.append(f) # register flux logging.debug(f"fname = {f.full_name}") return ro
[docs] def register_new_reservoir(r, sp, v): """Register a new reservoir.""" from .base_classes import Species rt = Species( # create new reservoir name=sp.name, species=sp, concentration=f"{v} mol/kg", register=r, volume=r.volume, rtype="computed", ) r.lor.append(rt) return [rt]
[docs] def register_return_values(ef: ExternalFunction, rg) -> None: """Register the return values of an external function instance. Parameters ---------- ec : ExternalFunction ExternalFunction Instance rg : Reservoir | Species The Resevoir or Reservoirgroup the external function is associated with Raises ------ ValueError If the return value type is undefined Check the return values of external function instances, and create the necessary reservoirs, fluxes, or connections if they are missing. These fluxes are not associated with a connection Object so we register the source/sink relationship with the reservoir they belong to. This fails for GasReservoir since they can have a 1:many relatioship. The below is a terrible hack, it would be better to express this with several connection objects, rather than overloading the source attribute of the GasReservoir class. """ from esbmtk import ( Flux, Reservoir, Sink, SinkProperties, Source, SourceProperties, Species, Species2Species, ) M = rg.mo # go through each entry in ec.return_values for line in ef.return_values: if isinstance(line, dict): dict_key = next(iter(line)) # get first key dict_value = line[dict_key] if dict_key[:2] == "F_": # is flux """The following code needs to extract the connection object. Names must have the following scheme M.Conn_CO2_At_to_H_b_CO2_H_b._F """ key_str = dict_key[2:].split(".")[1] logging.debug(f"register_return_values key_str = {key_str}") if hasattr(M, key_str): o = getattr(M, key_str) if isinstance(o, Flux): o: list = [o] elif isinstance(o, Species2Species): o: list = o.lof # get flux handle elif isinstance( o, Species | Reservoir | Sink | SinkProperties | Source | SourceProperties, ): # o: list = register_new_flux(ef, rg, dict_key[2:], dict_value) o: list = register_new_flux(ef, o, dict_key[2:], dict_value) else: raise ValueError(f"No recipie for {type(o)}") else: raise ValueError(f"{key_str} is not part of the Model definition") elif dict_key[:2] == "R_": # is reservoir if dict_key[2:] in M.lor: o: list = [getattr(M, dict_key)] else: r, sp = get_reservoir_reference(dict_key, M) o: list = register_new_reservoir(r, sp, dict_value) res = o[0] if res.rtype == "computed": # register dummy flux f = Flux( species=sp, # SpeciesProperties handle rate=0, # flux value register=res, # is this part of a group? isotopes=res.isotopes, id=f"{res.full_name}_F", ftype="computed", ) res.lof.append(f) o = [f] elif dict_key[:2] == "C_": # is connection raise NotImplementedError elif dict_key[:2] == "N_": # is connection pass else: raise ValueError(f"{dict_key[0:2]} is not defined") elif isinstance(line, Species): dict_value.ef_results = True o = [dict_value] else: raise ValueError("This should not happen") ef.lro.extend(o) # add to list of returned Objects
[docs] def summarize_results(M: Model) -> dict(): """Summarize all model results. At t_max into a hirarchical dictionary, where values are accessed in the following way: results[basin_name][level_name][species_name] e.g., result["A"]["sb"]["O2"] """ results = dict() for r in M.lor: species_name = r.name basin_name = r.register.name[:1] level_name = r.register.name[-2:] species_value = r.c[-1] if basin_name not in results: results[basin_name] = {level_name: {species_name: species_value}} elif level_name not in results[basin_name]: results[basin_name][level_name] = {species_name: species_value} elif species_name not in results[basin_name][level_name]: results[basin_name][level_name][species_name] = species_value return results
[docs] def find_matching_strings(s: str, fl: list[str]) -> bool: """Test if all elements of fl occur in s. Return True if yes,otherwise False """ return all(f in s for f in fl)
# def insert_into_namespace(name, value, name_space=globals()): # name_space[name] = value
[docs] def add_to(my_list, e): """Add element e to list l, but check if the entry already exist. If so, throw exception. Otherwise add """ if e not in my_list: # if not present, append element my_list.append(e)
[docs] def get_plot_layout(obj): """Select a row, column layout. Based on the number of objects to display. The expected argument is a reservoir object which contains the list of fluxes in the reservoir """ noo = 1 + sum(f.plot == "yes" for f in obj.lof) for _ in obj.ldf: noo += 1 # noo = len(obj.lof) + 1 # number of objects in this reservoir logging.debug(f"{noo} subplots for {obj.n} required") size, geo = plot_geometry(noo) return size, geo
[docs] def plot_geometry(noo: int) -> tuple(): """Define plot geometry based on number of objects to plot.""" if noo < 2: geometry = [1, 1] # one row, one column size = [5, 3] # width, height in inches elif 1 < noo < 3: geometry = [2, 1] # two rows, one column size = [5, 6] # width, height in inches elif 2 < noo < 5: geometry = [2, 2] # two rows, two columns size = [10, 6] # width, height in inches elif 4 < noo < 7: geometry = [3, 2] # 3 rows, 2 columns size = [10, 9] # width, height in inches elif 6 < noo < 10: geometry = [4, 2] # 4 rows, 2 columns size = [10, 12] # width, height in inches elif 9 < noo < 13: geometry = [5, 2] # 5 rows, 2 columns size = [10, 15] # width, height in inches elif 12 < noo < 16: geometry = [6, 2] # 6 rows, 2 columns size = [10, 18] # width, height in inches else: m = ( "plot geometry for more than 15 fluxes is not yet defined" "Consider calling flux.plot individually on each flux in the reservoir" ) raise ValueError(m) return size, geometry
[docs] def list_fluxes(self, name, i) -> None: """Echo all fluxes in the reservoir to the screen.""" for f in self.lof: # show the processes for p in f.lop: p.describe() for f in self.lof: f.describe(i) # print out the flux data
[docs] def set_y_limits(ax: plt.Axes, obj: any) -> None: """Prevent the display or arbitrarily small differences.""" bottom, top = ax.get_ylim() if (top - bottom) < obj.display_precision: top = bottom + obj.display_precision bottom -= obj.display_precision ax.set_ylim(bottom, top)
[docs] def is_name_in_list(n: str, my_list: list) -> bool: """Test if an object name is part of the object list.""" return any(e.full_name == n for e in my_list)
[docs] def get_object_from_list(name: str, my_list: list) -> any: """Match a name to a list of objects. Return the object """ match: bool = False for o in my_list: if o.full_name == name: r = o match = True if match: return r else: raise ValueError(f"Object = {o.full_name} has no matching flux {name}")
[docs] def sort_by_type(my_list: list, t: list, m: str) -> list: """Divide a list by type into new lists. This function will return a list and it is up to the calling code to unpack the list l is list with various object types t is a list which contains the object types used for sorting m is a string for the error function """ # from numbers import Number lc = my_list.copy() rl = [] for ot in t: # loop over object types a = [] for e in my_list: # loop over list elements if isinstance(e, ot): a.append(e) # add to temporary list lc.remove(e) # remove this element rl.append(a) # save the temporary list to rl # at this point, all elements of lc should have been processed # if not, lc contains element which are of a different type if lc: raise TypeError(m) return rl
[docs] def get_object_handle(res: list, M: Model): """Test if the key is a global reservoir handle. Or exists in the model namespace :param res: tp.List of strings, or reservoir handles :param M: Model handle """ r_list: list = [] if not isinstance(res, list): res = [res] for o in res: if o in M.dmo: # is object known in global namespace r_list.append(M.dmo[o]) elif o in M.__dict__: # or does it exist in Model namespace r_list.append(getattr(M, o)) else: print(f"{o} is not known for model {M.name}") raise ValueError(f"{o} is not known for model {M.name}") if len(r_list) == 1: r_list = r_list[0] return r_list
[docs] def split_key(k: str, M: any) -> any | any | str: """Split the string k with letters _to_. Test if optional id string is present """ if "_to_" not in k: raise ValueError("Name must follow 'Source_to_Sink' format") source = k.split("_to_")[0] sinkandid = k.split("_to_")[1] if "@" in sinkandid: sink = sinkandid.split("@")[0] cid = sinkandid.split("@")[1] else: sink = sinkandid cid = "" sink = get_object_handle(sink, M) source = get_object_handle(source, M) return (source, sink, cid)
[docs] def make_dict(keys: list, values: list) -> dict: """Create a dictionary from a list and value, or from two lists.""" d = {} if isinstance(values, list): if len(values) == len(keys): d: dict = dict(zip(keys, values, strict=False)) else: raise ValueError("key and value list must be of equal length") else: values: list = [values] * len(keys) d: dict = dict(zip(keys, values, strict=False)) return d
[docs] def initialize_reservoirs(M: Model, box_dict: dict) -> list(Species): """Initialize one or more reservoirs. Based on the data in box_dict (see the example below). Parameters ---------- M : Model Model handle box_dict : dict dictionary with reservoir parameters Returns ------- tp.List list of all Species objects in box_dict Raises ------ ValueError If there are no Species objects in the dictionary Example:: box_parameters = { # name: [[ud, ld ap], T, P, S] # Atlantic Ocean "M.A_sb": { "g": [0, -100, A_ap], "T": 20, "P": 5, "S": 34.7, "c": {M.PO4: "2.1 mmol/kg", M.DIC: "2.21 mmol/kg", M.TA: "2.31 mmol/kg", } species_list = initialize_reservoirs(M, box_parameters) Note: the first entry in the box_parameters dict must be a regular reservoir, not a Source or Sink. """ from esbmtk import Reservoir, SinkProperties, SourceProperties, SpeciesProperties # loop over reservoir names for box_name, value in box_dict.items(): match value.get("ty"): # type is given case "Source": SourceProperties( name=box_name, species=value["sp"], delta=value.get("d", {}), isotopes=value.get("i", {}), register=M, ) case "Sink": SinkProperties( name=box_name, species=value["sp"], delta=value.get("d", {}), isotopes=value.get("i", {}), register=M, ) case _: # default to creating a reservoir _rg = Reservoir( name=box_name, geometry=value["g"], concentration=value.get("c", "0 mol/kg"), delta=value.get("d", {}), isotopes=value.get("i", {}), plot=value.get("p", {}), seawater_parameters={ "temperature": value["T"], "pressure": value["P"], "salinity": value["S"], }, register=M, ) first_key = list(box_dict.keys())[0] b_type = box_dict[first_key].get("ty") if b_type == "Source" or b_type == "Sink": raise ValueError( "\nThe first entry in the box_parameter dict must be a regular reservoir\n" "You likely started with a Source or Sink\n" "Check your arguments to initialize_reservoirs()\n" ) species_dict = box_dict[first_key].get("c", "None") if isinstance(species_dict, dict): species_list = list(species_dict.keys()) if not isinstance(species_list[0], SpeciesProperties): raise ValueError( f"{species_list[0]} must be of type SpeciesProperties", f"not {type(species_list[0])}", ) else: raise ValueError("No species in species dict!") return species_list
[docs] def build_concentration_dicts(cd: dict, bg: dict) -> dict: """Build a dict which can be used by create_reservoirs. :param bg: dict where the box_names are dict keys. :param cd: dictionary with the following format:: cd = { # species: [concentration, isotopes] PO4: [Q_("2.1 * umol/liter"), False], DIC: [Q_("2.1 mmol/liter"), False], } This function returns a new dict in the following format # box_names: [concentrations, isotopes] d= {"bn": [{PO4: .., DIC: ..},{PO4:False, DIC:False}]} """ if isinstance(bg, dict): box_names: list = bg.keys() elif isinstance(bg, list): box_names: list = bg elif isinstance(bg, str): box_names: list = [bg] else: raise ValueError("This should never happen") icd: dict = {} td1: dict = {} # temp dictionary td2: dict = {} # temp dictionary td3: dict = {} # temp dictionary # create the dicts for concentration and isotopes for k, v in cd.items(): td1[k] = v[0] td2[k] = v[1] td3[k] = v[2] # box_names: tp.List = bg.keys() for bn in box_names: # loop over box names icd[bn] = [td1, td2, td3] return icd
[docs] def calc_volumes(bg: dict, M: any, h: any) -> list: """Calculate volume contained in a given depth interval. bg is a dictionary in the following format:: bg={ "hb": (0.1, 0, 200), "sb": (0.9, 0, 200), } where the key must be a valid box name, the first entry of the list denoted the areal extent in percent, the second number is upper depth limit, and last number is the lower depth limit. M must be a model handle h is the hypsometry handle The function returns a list with the corresponding volumes """ # from esbmtk import hypsometry v: list = [] # list of volumes for v in bg.values(): a = v[0] u = v[1] ll = v[2] v.append(h.volume(u, ll) * a) return v
[docs] def get_longest_dict_entry(d: dict) -> int: """Get length of each item in the connection dict.""" l_length = 0 # length of longest list p_length = 0 # length of single parameter nl = 0 # number of lists ll = [] # we need to cover the case where we have two lists of different length # this happens if we have a long list of tuples with matched references, # as well as a list of species for v in d.values(): if isinstance(v, list): nl = nl + 1 if len(v) > l_length: l_length = len(v) ll.append(l_length) else: p_length = 1 if nl > 1 and ll.count(l_length) != len(ll): raise ValueError("Mapping for multiple lists is not supported") if l_length > 0 and p_length == 0: case = 0 # Only lists present if l_length == 0 and p_length == 1: case = 1 # Only parameters present if l_length > 0 and p_length == 1: case = 2 # Lists and parameters present return case, l_length
[docs] def convert_to_lists(d: dict, my_list: int) -> dict: """Expand mixed dict entries. (i.e. list and single value) such that they are all lists of equal length """ cd = d.copy() for k, v in cd.items(): if not isinstance(v, list): p = [v for _ in range(my_list)] d[k] = p return d
[docs] def get_sub_key(d: dict, i: int) -> dict: """Take a dict which has where the value is a list. Return the key with the n-th value of that list """ return {k: v[i] for k, v in d.items()}
[docs] def expand_dict(d: dict, mt: str = "1:1") -> int: """Determine dict structure. in case we have mutiple connections with mutiple species, the default action is to map connections to species (t = '1:1'). If you rather want to create mutiple connections (one for each species) in each connection set t = '1:N' """ # loop over dict entries # ck = connection key # cd = connection dict r: dict = {} # the dict we will return for ck, cd in d.items(): # loop over connections rd: dict = {} # temp dict nd: dict = {} # temp dict case, length = get_longest_dict_entry(cd) if isinstance(ck, tuple): # assume 1:1 mapping between tuple and connection parameters if mt == "1:1": # prep dictionaries if case == 0: nd = cd elif case == 1: # only parameters present. Expand for each tuple entry length = len(ck) nd = convert_to_lists(cd, length) elif case == 2: # mixed list present, Expand list nd = convert_to_lists(cd, length) # for each connection group in the tuple if length != len(ck): message = ( f"The number of connection properties ({length})\n" f"does not match the number of connection groups ({len(ck)})\n" f"did you intend to do a 1:N mapping?" ) raise ValueError(message) # map property dicts to connection group names i = 0 for t in ck: rd[t] = get_sub_key(nd, i) i = i + 1 elif mt == "1:N": # apply each species to each connection if case == 0: nd = cd elif case == 1: # only parameters present. Expand for each tuple entry length = len(ck) nd = convert_to_lists(cd, length) elif case == 2: # mixed list present, Expand list nd = convert_to_lists(cd, length) for t in ck: # apply the entire nd dict to all connections rd[t] = nd else: raise ValueError(f"{mt} is not defined. must be '1:1' or '1:N'") else: if case in [0, 1]: # only lists present, case 3 nd = cd elif case == 2: # list and parameters present case 4 nd = convert_to_lists(cd, length) rd[ck] = nd # update the overall dict and move to the next entry # r |= rd r.update(rd) return r
[docs] def create_bulk_connections(ct: dict, M: Model, mt: int = "1:1") -> dict: """Create connections from a dictionary. The dict can have the following format: mt = mapping type. See below for explanation # na: names, tuple or str. If lists, all list elements share the same properties # sp: species list or species # ty: type, str # ra: rate, Quantity # sc: scale, Number # re: reference, optional # al: epsilon, optional # de: delta, optional # bp: bypass, see scale_with_flux # si: signal # mx: True, optional defaults to False. If set, it will create forward and backward fluxes (i.e. mixing) There are 6 different cases how to specify connections Case 1 One connection, one set of parameters ct1 = {"sb2hb": {"ty": "scale", 'ra'....}} Case 2 One connection, one set of instructions, one subset with multiple parameters This will be expanded to create connections for each species ct2 = {"sb2hb": {"ty": "scale", "sp": ["a", "b"]}} Case 3 One connection complete set of multiple characters. Similar to case 2, but now all parameters are given explicitly ct3 = {"sb2hb": {"ty": ["scale", "scale"], "sp": ["a", "b"]}} Case 4 Multiple connections, one set of parameters. This will create identical connection for "sb2hb" and "ib2db" ct4 = {("sb2hb", "ib2db"): {"ty": "scale", 'ra': ...}} Case 5 Multiple connections, one subset of multiple set of parameters. This wil create a connection for species 'a' in sb2hb and with species 'b' in ib2db ct5 = {("sb2hb", "ib2db"): {"ty": "scale", "sp": ["a", "b"]}} Case 6 Multiple connections, complete set of parameters of multiple parameters Same as case 5, but now all parameters are specified explicitly ct6 = {("sb2hb", "ib2db"): {"ty": ["scale", "scale"], "sp": ["a", "b"]}} The default interpretation for cases 5 and 6 is that each list entry corresponds to connection. However, sometimes we want to create multiple connections for multiple entries. In this case provide the mt='1:N' parameter which will create a connection for each species in each connection group. See the below example. It is easy to shoot yourself in the foot. It is best to try the above first with some simple examples, e.g., from esbmtk import expand_dict ct2 = {"sb2hb": {"ty": "scale", "sp": ["a", "b"]}} It is best to use the show_dict function to verify that your input dictionary produces the correct results! """ from esbmtk.utility_functions import create_connection, expand_dict # expand dictionary into a well formed dict where each connection # has a fully formed entry c_ct = expand_dict(ct, mt=mt) # loop over dict entries and create the respective connections for k, v in c_ct.items(): if isinstance(k, tuple): # loop over names in tuple for c in k: create_connection(c, v, M) elif isinstance(k, str): create_connection(k, v, M) else: raise ValueError(f"{c} must be string or tuple") return c_ct
[docs] def create_connection(n: str, p: dict, M: Model) -> None: """Create a connection group. It is assumed that all rates are in liter/year or mol per year. This may not be what you want or need. :param n: a connection key. if the mix flag is given interpreted as mixing a connection between sb and db and thus create connections in both directions :param p: a dictionary holding the connection properties :param M: the model handle """ from esbmtk import Q_, ConnectionProperties # get the reservoir handles by splitting the key source, sink, cid = split_key(n, M) # create default connections parameters and replace with values in # the parameter dict if present. los = list(p["sp"]) if isinstance(p["sp"], list) else [p["sp"]] ctype = p.get("ty", "None") scale = p.get("sc", 1) rate = Q_("0 mol/a") if "ra" not in p else p["ra"] ref_flux = p.get("re", "None") epsilon = p.get("ep", "None") # alpha = p.get("al", "None") delta = p.get("de", "None") cid = f"{cid}" bypass = p.get("bp", "None") # species = p.get("sp", "None") signal = p.get("si", "None") if isinstance(scale, Q_): if scale.check(["dimensionless"]): scale = scale.magnitude else: scale = scale.to(M.f_unit).magnitude # expand arguments ctype = make_dict(los, ctype) scale = make_dict(los, scale) # get rate from dictionary rate = make_dict(los, rate) ref_flux = make_dict(los, ref_flux) epsilon = make_dict(los, epsilon) delta = make_dict(los, delta) bypass = make_dict(los, bypass) signal = make_dict(los, signal) name = f"{M.name}.ConnGrp_{source.name}_to_{sink.name}" if f"{name}" in M.lmo: # Test if CG exists cg = getattr(M, name.split(".")[1]) # retriece CG object cg.add_connections( source=source, sink=sink, ctype=ctype, scale=scale, # get rate from dictionary rate=rate, ref_flux=ref_flux, epsilon=epsilon, delta=delta, bypass=bypass, register=M, signal=signal, id=cid, # get id from dictionary ) else: # Create New ConnectionProperties cg = ConnectionProperties( source=source, sink=sink, ctype=ctype, scale=scale, # get rate from dictionary rate=rate, ref_flux=ref_flux, epsilon=epsilon, delta=delta, bypass=bypass, register=M, signal=signal, id=cid, # get id from dictionary )
[docs] def get_name_only(o: any) -> any: """Test if item is an esbmtk type. If yes, extract the name """ from esbmtk import Flux, Reservoir, Species, SpeciesProperties return ( o.full_name if isinstance(o, Flux | Species | Reservoir | SpeciesProperties) else o )
[docs] def get_simple_list(my_list: list) -> list: """Return a list. Which only has the full name rather than all the object properties """ return [get_name_only(e) for e in my_list]
[docs] def show_dict(d: dict, mt: str = "1:1") -> None: """Show dict entries in an organized manner.""" from esbmtk import expand_dict, get_name_only, get_simple_list ct = expand_dict(d, mt) for _ck, cv in ct.items(): for _pk, pv in cv.items(): get_simple_list(pv) if isinstance(pv, list) else get_name_only(pv)
[docs] def find_matching_fluxes(my_list: list, filter_by: str, exclude: str) -> list: """Loop over all reservoirs in my_list. And extract the names of all fluxes which match the filter string. Return the list of names (not objects!) """ lof: set = set() for r in my_list: for f in r.lof: if filter_by in f.full_name and exclude not in f.full_name: lof.add(f) return list(lof)
[docs] def reverse_key(key: str) -> str: """Reverse a connection key e.g., sb2db@POM becomes db2sb@POM.""" left = key.split("@") left = left[0] rs = left.split("_to_") r1 = rs[0] r2 = rs[1] return f"{r2}_to_{r1}"
[docs] def reverse_tick_labels_factory(max_tick_value): """Reverse the label of the x-axis ticks but not the data. Usage: From matplotlib.ticker import FuncFormatter ax.xaxis.set_major_formatter(FuncFormatter(reverse_tick_labels_factory(x_max))) where xmax = max value on the x-axis """ def formatter(x, pos): return f"{max_tick_value - x:.1f}" return formatter
[docs] def get_connection_keys( f_list: set, ref_id: str, target_id: str, inverse: bool, exclude: str, ) -> list[str]: """Extract connection keys from set of flux names. Replace ref_id with target_id so that the key can be used in create_bulk_connnections() :param f_list: a set with flux objects :param ref_id: string with the reference id :param target_id: string with the target_id :param inverse: Bool, optional, defaults to false :return cnc_l: a list of connection keys (str) The optional inverse parameter, can be used where in cases where the flux direction needs to be reversed, i.e., the returned key will not read sb2db@POM, but db2s@POM """ cnc_l: list = [] # list of connection keys for f in f_list: # get connection and flux name fns = f.full_name.split(".") cnc = fns[1][3:] # get key without leadinf C_ if inverse: cnc = reverse_key(cnc) cnc.replace(ref_id, target_id) # create new cnc string cnc = f"{cnc}_to_{target_id}" cnc_l.append(cnc) return cnc_l
[docs] def gen_dict_entries(M: Model, **kwargs) -> tuple(tuple, list): """Find all fluxes that contain the reference string. Create a new Species2Species instance that connects the flux matching ref_id, with a flux matching target_id. The function will a tuple containig the new connection keys that can be used by the create bulk_connection() function. The second return value is a list containing the reference fluxes. The optional inverse parameter, can be used where in cases where the flux direction needs to be reversed, i.e., the returned key will not read sb_to_dbPOM, but db_to_sb@POM :param M: Model or list :param kwargs: keyword dictionary, known keys are ref_id, and raget_id, inverse :return f_list: tp.List of fluxes that match ref_id :return k_tuples: tuple of connection keys """ from esbmtk import Model ref_id = kwargs["ref_id"] target_id = kwargs["target_id"] inverse = kwargs.get("inverse", False) exclude_str = kwargs.get("exclude", "None") # find matching fluxes if isinstance(M, Model): f_list: list = find_matching_fluxes( M.loc, filter_by=ref_id, exclude=exclude_str, ) elif isinstance(M, list): f_list: list = find_matching_fluxes( M, filter_by=ref_id, exclude=exclude_str, ) else: raise ValueError(f"gen_dict_entries: M must be list or Model, not {type(M)}") k_tuples: list = get_connection_keys( f_list, ref_id, target_id, inverse, exclude_str, ) return tuple(k_tuples), f_list
[docs] def build_ct_dict(d: dict, p: dict) -> dict: """Build a connection dictionary. From a dict containing connection keys, and a dict containing connection properties. This is most useful for connections which a characterized by a fixed rate but apply to many species. E.g., mixing fluxes in a complex model etc. """ # a = {k: {"sc": v} | p for k, v in d.items()} a = {} for k, v in d.items(): a[k] = p.copy() a[k]["sc"] = v return a
[docs] def get_string_between_brackets(s: str) -> str: """Parse string and extract substring between square brackets.""" s = s.split("[") if len(s) < 2: raise ValueError(f"Column header {s} must include units in square brackets") s = s[1] s = s.split("]") if len(s) < 2: raise ValueError(f"Column header {s} must include units in square brackets") return s[0]
[docs] def check_for_quantity(quantity, unit): r"""Check if keyword is quantity or string an convert as necessary. - If input is a string, convert string into a quantity - If input is a quantity, do nothing - if input is a number, convert to default quantity Parameters ---------- quantity : str | quantity | float | int e.g., "12 m/s", or 12, unit : str desired unit for keyword, e.g., "m/s" Returns ------- Q\_ Returns a Quantity Raises ------ ValueError if keywword is neither number, str or quantity """ from esbmtk import Q_ if isinstance(quantity, str): quantity = Q_(quantity) elif isinstance(quantity, float | int): quantity = Q_(f"{quantity} {unit}") elif not isinstance(quantity, Q_): raise ValueError("kw must be string, number or Quantity") return quantity
[docs] def map_units(obj: any, v: any, *args) -> float: """Parse v to see if it is a string. If yes, map to quantity. parse v to see if it is a quantity, if yes, map to model units and extract magnitude, assign mangitude to return value if not, assign value to return value :param obj: connection object :param v: input string/number/quantity :args: tp.List of model base units :returns: number :raises ScaleError: if input cannot be mapped to a model unit """ from . import Q_ m: float = 0 match: bool = False # test if string, map to quantity if yes if isinstance(v, str): v = Q_(v) # test if we find a matching dimension, map if true if isinstance(v, Q_): for q in args: if v.dimensionality == q.dimensionality: m = v.to(q).magnitude match = True if not match: raise ScaleError( f"Missing base units for the scale in {obj.full_name}, this should not happen" f"complain to the program author" ) else: # no quantity, so it should be a number m = v if not isinstance(m, int | float): raise ValueError(f"m is {type(m)}, must be float, v={v}. Something is fishy") return m
def __find_flux__(reservoirs: list, full_name: str): """Find a Flux object based on its full_name. PRECONDITIONS: full_name must contain the full_name of the Flux Parameters ---------- reservoirs: tp.List containing all reservoirs full_name: str specifying the full name of the flux (boxes.flux_name) """ needed_flux = None for res in reservoirs: for flux in res.lof: if flux.full_name == full_name: needed_flux = flux break if needed_flux is not None: break if needed_flux is None: raise NameError( f"add_carbonate_system: Flux {full_name} cannot be found in any of the reservoirs in the Model!" ) return needed_flux def __checktypes__(allwed_keys: dict[any, any], provided_keys: dict[any, any]) -> None: """Look up the allowed data type for this key in allowed_keys. av = dictinory with the allowed input keys and their type pv = dictionary with the user provided key-value data """ k: any v: any # loop over provided keywords for k, v in provided_keys.items(): # check av if provided value v is of correct type if allwed_keys[k] != any and not isinstance(v, allwed_keys[k]): raise TypeError( f"{type(v)} is the wrong type for '{k}', should be '{allwed_keys[k]}'" )
[docs] def dict_alternatives(d: dict, e: str, a: str) -> any: """Use dictionary =d=, an expression =e=, and an alternative expression =a=. Returns the value associated with either =a= or =e= in the dictionary =d=. :param d: A dictionary. :param e: The first expression to check. :param a: The alternative expression to check. :returns r: The value associated with either =a= or =e= in the dictionary =d=. :raises ValueError: If neither =a= nor =e= are found in the dictionary. """ if a in d: r = d[a] elif e in d: r = d[e] else: raise ValueError(f"You must specify either {e} or {a} \nd = {d}") return r
def __checkkeys__(lrk: list, lkk: list, kwargs: dict) -> None: """Check if the mandatory keys are present. lrk = list of required keywords lkk = list of all known keywords kwargs = dictionary with key-value pairs """ k: str # test if the required keywords are given for k in lrk: # loop over required keywords if isinstance(k, list): # If keyword is a list s: int = 0 # loop over allowed substitutions for e in k: # test how many matches are in this list if ( e in kwargs and not isinstance(e, np.ndarray | list) and kwargs[e] != "None" ): s = s + 1 if s > 1: # if more than one match raise ValueError(f"You need to specify exactly one from this list: {k}") elif k not in kwargs: raise ValueError(f"You need to specify a value for {k}") tl: list[str] = [k for k, v in lkk.items()] # test if we know all keys for k in kwargs: if k not in lkk: raise ValueError(f"{k} is not a valid keyword. \n Try any of \n {tl}\n") def __addmissingdefaults__(lod: dict, kwargs: dict) -> dict: """Test if the keys in lod exist in kwargs. otherwise add them with the default values from lod """ new: dict = {} if lod: for k, v in lod.items(): if k not in kwargs: new[k] = v # kwargs |= new kwargs.update(new) return kwargs
[docs] def data_summaries( M: Model, species_names: list, box_names: list, register_with="None", ) -> list: r"""Group results by species and Reservoirs. :param M: model instance :param species_names: tp.List of species instances :param box_names: tp.List of Reservoir instances :param register_with: defaults to M :returns pl: a list of datafield instances to be plotted Note: because pl is a list, you need to expand it before using it in the plot method. Either use M.plot(pl) or M.plot([\*pl]) If you call a plot with many subfigures, use pl like this: M.plot([o1, o2, \*pl]) """ from esbmtk import DataField, VectorData if register_with == "None": register_with = M pl = [] for sp in species_names: data_list = [] label_list = [] t = "" for b in box_names: if isinstance(b, VectorData): # FIXME: Should a be b???? data_list.append(a) label_list.append(a.full_name) t = b.name if hasattr(b, f"{sp.name}"): a = getattr(b, f"{sp.name}") # this is a reservoir y1, unit, d_scale = a.get_conversion_unit_information() data_list.append(y1) label_list.append(f"{b.name}") t = sp.display_as if hasattr(sp, "display_as") else sp.name df = DataField( name=f"{sp.name}_df", register=register_with, x1_data=M.time, y1_data=data_list, y1_label=label_list, y1_legend=f"{sp.name} [{unit}]", x1_as_time=True, title=t, ) pl.append(df) """ check if species has isotope data. This needs to be done after the above loop, so that the data is in its own datafield. """ data_list = [] label_list = [] for b in box_names: if hasattr(b, f"{sp.name}"): a = getattr(b, f"{sp.name}") if a.isotopes: unit = f"{a.species.element.d_label}" data_list.append(a.d) label_list.append(f"{b.name}") if len(data_list) > 0: df = DataField( name=f"d_{sp.name}_df", register=register_with, x1_data=M.time, y1_data=data_list, y1_label=label_list, y1_legend=f"{unit} [{sp.element.d_scale}]", x1_as_time=True, title=sp.element.d_label, ) pl.append(df) return pl
[docs] def fractionate_with_epsilon( epsilon: float, source_c: float, source_l: float, sink_c: float, ) -> float: """Calculate how a given epsilon would affect the downstream isotoe ratio.""" alpha = epsilon / 1000 + 1 # get alpha sink_l = (alpha * source_l * sink_c) / (alpha * source_l - source_l + source_c) return sink_l
# @njit(fastmath=True)
[docs] def get_l_mass(m: float, d: float, r: float) -> float: """Get mass of light isotope. :param m: mass or concentration :param d: delta value :param r: isotopic reference ratio return mass or concentration of the light isotopeb """ return (1000.0 * m) / ((d + 1000.0) * r + 1000.0)
[docs] def get_delta_h(R) -> float: """Calculate the delta of a flux or reservoir. :param R: Species or Flux handle returns d as vector of delta values R.c = total concentration R.l = concentration of the light isotope """ from esbmtk import Flux, GasReservoir, Species r = R.species.r # reference ratio if isinstance(R, Species | GasReservoir): d = np.where(R.l > 0, 1e3 * ((R.c - R.l) / R.l - r) / r, 0) elif isinstance(R, Flux): d = np.where(R.l > 0, 1e3 * ((R.m - R.l) / R.l - r) / r, 0) else: raise ValueError( f"{R.full_name} must be of type Flux or Species, not {type(R)}" ) return d
[docs] def get_delta_from_concentration(c, li, r): """Calculate the delta from the mass of light and heavy isotope. :param c: total mass/concentration :param l: light isotope mass/concentration :param r: reference ratio """ h = c - li d = 1000 * (h / li - r) / r return d
[docs] def get_imass(m: float, d: float, r: float) -> [float, float]: """Calculate the isotope masses from bulk mass and delta value. Arguments are m = mass, d= delta value, r = abundance ratio species """ li: float = (1000.0 * m) / ((d + 1000.0) * r + 1000.0) h: float = m - li return [li, h]
# @njit(fastmath=True)
[docs] def get_delta(li: NDArrayFloat, h: NDArrayFloat, r: float) -> NDArrayFloat: """Calculate the delta from the mass of light and heavy isotope. :param l: light isotope mass/concentration :param h: heavy isotope mass/concentration :param r: reference ratio :return : delta """ return 1000 * (h / li - r) / r
# @njit(fastmath=True)
[docs] def get_new_ratio_from_alpha( ref_mass: float, # reference mass ref_l: float, # reference light istope a: float, # fractionation factor ) -> [float, float]: """Calculate the effect of the istope fractionation factor alpha. For the ratio between the mass of the light isotope devided by the total mass Note that alpha needs to be given as fractional value, i.e., 1.07 rather than 70 (i.e., (alpha-1) * 1000 """ new_ratio = -ref_l / (a * ref_l - a * ref_mass - ref_l) if ref_mass > 0.0 else 0.0 return new_ratio
[docs] def register_user_function(M: Model, lib_name: str, func_name: str | list) -> None: """Register user supplied library and function with the model. Parameters ---------- M : Model Model handle lib_name : str name of python file that contains the function func_name : str | list Name of one or more user supplied function(s) """ import pathlib as pl import sys if isinstance(func_name, str): func_name = [func_name] cwd: pl.Path = pl.Path.cwd() # get the current working if cwd not in sys.path: sys.path.append(cwd) for f in func_name: M.luf[f] = [lib_name, cwd]
[docs] def create_reservoirs(): """Raise Error on use.""" raise NotImplementedError("create_reservoirs is no longer supported")
[docs] def check_for_NaN(a: NDArrayFloat) -> bool: """Test if a contains NaN values.""" return np.isnan(a).any()
[docs] def warn_if_non_numeric(df): """Warn about non numeric data in dataframe. Checks all columns in the DataFrame for non-numeric values. Prints warnings with the row indices and offending values for each column. Args: df (pd.DataFrame): The DataFrame to check. """ import pandas as pd ed = False for col in df.columns: numeric_col = pd.to_numeric(df[col], errors="coerce") non_numeric_mask = numeric_col.isna() & df[col].notna() if non_numeric_mask.any(): bad_indices = df.index[non_numeric_mask].tolist() print( f"Warning: Non-numeric data found in column '{col}' at rows: {bad_indices}" ) print(df.loc[bad_indices, col]) ed = True return ed