Source code for esbmtk.connections

"""esbmtk: A general purpose Earth Science box model toolkit.

Copyright (C), 2020 Ulrich G. Wortmann

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""

from __future__ import annotations

import logging
import uuid
import warnings
from collections.abc import Callable
from typing import Any

import numpy as np
import numpy.typing as npt

from .esbmtk_base import esbmtkBase
from .utility_functions import map_units

np.set_printoptions(precision=4)
# declare numpy types
NDArrayFloat = npt.NDArray[np.float64]


[docs] class Species2SpeciesError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Error Instance.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] class ScaleFluxError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Error Instance.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] class KeywordError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Error Instance.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] class Species2Species(esbmtkBase): """Connect two reservoir species to each other. This module creates the connecting flux and creates a connector object which stores all connection properties. For simple connections, the type flux type is derived implcitly from the specified parameters. For complex connections, the flux type must be set explicitly. See the examples below: Parameters ---------- - source: An object handle for a Source or Species - sink: An object handle for a Sink or Species - rate: A quantity (e.g., "1 mol/s"), optional - delta: The isotope ratio, optional - ref_reservoirs: Species or flux reference - epsilon: A fractionation factor, optional - id: A string wich will become part of the object name, it will override automatic name creation - signal: An object handle of signal, optional - ctype: connection type, see below - bypass :str optional defaults to "None" see scale with flux The connection name is derived automatically, and can be queried with M.connection_summary() Connection Types: ----------------- Connecting two reservoirs with a fixed rate: >>> Species2Species( # deep box to sediment >>> ctype="fixed", >>> source=M.L_b.PO4, >>> sink=M.Fb.PO4, >>> scale="108 Gmol/year", >>> id="shelf_burial", >>> ) Connecting two reservoirs with a concentration dependent flux: >>> Species2Species( # Surface to deep box >>> source=M.L_b.PO4, >>> sink=M.D_b.PO4, >>> ctype="scale_with_concentration", >>> scale=M.L_b.PO4.volume >>> / M.tau >>> * M.L_b.swc.density >>> / 1000, # concentration * volume = mass * 1/tau >>> id="po4_productivity", >>> ) Connecting two reservoirs using another flux as reference. Note presently, you cannot chain flux references! You can use a reference flux multiple times, but it is not possible to reference f1 for f2, and then reference f2 for f3. The reference flux can either be given as a flux object, e.g., as returned from a flux lookup: M.flux_summary(filter_by="po4_productivity", return_list=True)[0] or simply as the id string: "po4_productivity" >>> Species2Species( # deep box to sediment >>> source=M.D_b.PO4, >>> sink=M.Fb.PO4, >>> ctype="scale_with_flux", >>> ref_flux=""po4_productivity", >>> # increase p_burial >>> scale=(1 - M.remin_eff), # burial of ~1% P >>> id="burial", >>> ) Useful methods in this class ---------------------------- - info() will provide a short description of the connection objects. - list_processes() which will list all the processes which are associated with this connection. - update() which allows you to update connection properties after the connection has been created """ def __init__(self, **kwargs): """Perform sanity checks. - whether the reservoirs exist - correct flux properties (this will be handled by the process object) - whether the processes do exist (hmmh, that implies that the optional processes do get registered with the model) - creates the correct default processes - and connects the reservoirs see the class documentation for details and examples """ from esbmtk import ( Q_, ConnectionProperties, Flux, GasReservoir, Model, Signal, Sink, Source, Species, Species2Species, SpeciesProperties, ) # provide a dict of all known keywords and their type self.defaults: dict[str, list[any, tuple, bool]] = { "init_delta": [0, (int), False], "init_rate": [0, (int), False], "init_scale": [0, (int), False], "init_epsilon": [0, (int), False], "init_done": [False, (bool)], "id": ["", (str), False], "source": ["None", (str, Source, Species, GasReservoir), False], "sink": ["None", (str, Sink, Species, GasReservoir), False], "delta": ["None", (int, float, str, dict), True], "rate": ["None", (str, int, float, Q_, dict), True], "pl": ["None", (list, str), False], "epsilon": ["None", (int, float, str, dict), True], "scale": [1, (int, float, Q_, str), True], "species": ["None", (SpeciesProperties, str), False], "alpha": ["None", (str, float), False], "ctype": ["regular", (str), False], "ref_reservoirs": ["None", (Species, GasReservoir, str, list), False], "reservoir_ref": ["None", (GasReservoir, str), False], "ref_flux": ["None", (Flux, str, list), False], "ratio": ["None", (int, float, str), False], "ref_value": ["None", (str, int, float, Q_), False], "k_value": ["None", (int, float, str, Q_), False], "a_value": ["None", (int, float), False], "b_value": ["None", (int, float), False], "left": ["None", (list, int, float, Species, GasReservoir), False], "right": ["None", (list, int, float, Species, GasReservoir), False], "plot": ["yes", (str), False], "groupname": [False, (bool), False], "register": [ "None", (str, Model, Species2Species, ConnectionProperties), False, ], "signal": ["None", (Signal, str), False], "bypass": ["None", (str, Species, GasReservoir), False], "isotopes": [False, (bool), False], "solubility": ["None", (str, int, float), False], "a_db": [1, (Callable, int, float), False], "a_dg": [1, (Callable, int, float), False], "a_u": [1, (Callable, int, float), False], "area": ["None", (str, int, float, Q_), False], "ex": [1, (int, float), False], "pco2_0": ["280 ppm", (str, Q_), False], "piston_velocity": ["None", (str, int, float), False], "function_ref": ["None", (str, callable), False], "ref_species": ["None", (str, Species), False], "water_vapor_pressure": ["None", (str, float), False], } # FIXME: We need a mechanism to first check for the # connection type (ctype) and then set the lrk list # depending on the connection type.! # provide a list of absolutely required keywords # the __scaleflux__ method (and similarm could return the # necessary information). self.lrk: list = ["source", "sink"] self.lop: list = [] self.lof: list = [] self.__initialize_keyword_variables__(kwargs) if self.register == "None": self.register = self.source.model if self.id == "None": self.id = str(uuid.uuid4())[:8] # legacy names self.influx: int = 1 self.outflux: int = -1 self.mo = self.source.sp.mo self.model = self.mo self.sp = self.source.species self.p = 0 # the default process handle self.r1 = self.source self.r2 = self.sink self.parent = self.register self.source_name = self.source.full_name self.sink_name = self.sink.full_name self.target = self.sink.full_name # Initialize setters self.scale = self._scale # This will trigger the setter with the current value self.delta = self._delta self.epsilon = self._epsilon self.rate = self._rate if isinstance(self.pco2_0, str): self.pco2_0 = Q_(self.pco2_0).to("ppm").magnitude * 1e-6 elif isinstance(self.pco2_0, Q_): self.pco2_0 = self.pco2_0.magnitude.to("ppm").magnitude * 1e-6 self.lop: list = self.pl if "pl" in kwargs else [] if self.signal != "None": self.lop.append(self.signal) if self.rate != "None": if isinstance(self.rate, str): self._rate: float = Q_(self.rate).to(self.mo.f_unit).magnitude elif isinstance(self.rate, Q_): self._rate: float = self.rate.to(self.mo.f_unit).magnitude elif isinstance(self.rate, int | float): self._rate: float = self.rate # if no reference reservoir is specified, default to the upstream # reservoir if self.ref_reservoirs == "None": self.ref_reservoirs = kwargs["source"] if isinstance(self.source, Source): self.isotopes = self.sink.isotopes else: self.isotopes = self.source.isotopes self.get_species(self.r1, self.r2) # self.mo: Model = self.sp.mo # the current model handle self.lof: list[Flux] = [] # list of fluxes in this connection # get a list of all reservoirs registered for this species self.lor: list[Species] = self.mo.lor self.__set_name__() # get name of connection if self.model.debug: logging.info(f"{self.name} isotopes = {self.isotopes}") if all([ self.isotopes, isinstance(self.source, Source), self.delta == "None", self.epsilon == "None", ]): self.delta = self.source.delta warnings.warn( f"\n\nPlease specify the delta value for the weathering flux in {self.name}\n" "Using {self.delta} for now, but this may not be what you want.\n\n", stacklevel=2, ) self.model.now = self.model.now + 1 self.__register_with_parent__() # register connection in namespace self.__create_flux__() # Source/Sink/Fixed self.__set_process_type__() # derive flux type and create flux(es) if self.mo.register == "local" and self.register == "None": self.register = self.mo self.source.loc.add(self) # register connector with reservoir self.sink.loc.add(self) # register connector with reservoir self.mo.loc.add(self) # register connector with model self.r_index = self.__add_to_ode_constants__(self.rate, "rate") self.d_index = self.__add_to_ode_constants__(self.delta, "delta") self.a_index = self.__add_to_ode_constants__(self.epsilon, "epsilon") self.s_index = self.__add_to_ode_constants__(self.scale, "scale") self.init_done = True def __set_name__(self): """Create connection name. The name is derived according to the following scheme: if manual connection if sink and source species are equal name = C_source2sink_species name otherwise name = C_source.species_name2sink.species_name if parent == ConnectionProperties if sink and source species are equal name = species name otherwise name = source.species2sink.species if id is set and id contains the species name, id will be taken as as connection name, otherwise, append id to the name """ from esbmtk import Reservoir, Source, SourceProperties # same species? if self.sink.species.name == self.source.species.name: self.name = f"{self.source.species.name}" else: self.name = f"{self.source.species.name}_to_{self.sink.species.name}" # Connect by itself if not isinstance(self.parent, ConnectionProperties): # manual connection if isinstance(self.source.parent, Reservoir | Source | SourceProperties): so = self.source.parent.name else: so = self.source.name si = ( self.sink.parent.name if isinstance(self.sink.parent, Reservoir) else self.sink.name ) self.name = f"Conn_{so}_to_{si}_{self.source.sp.name}" elif self.sink.species.name == self.source.species.name: self.name = f"{self.source.species.name}" else: self.name = f"{self.source.species.name}_to_{self.sink.species.name}" # always overide name with id for manual connections if self.ctype == "weathering": self.name = f"{self.name}_{self.id}" elif self.id != "None": if (self.source.species.name in self.id) or ( self.sink.species.name in self.id ): self.name = f"{self.id}" else: self.name = f"{self.name}_{self.id}"
[docs] def update(self, **kwargs): """Update connection properties. This will delete existing processes and fluxes, replace existing key-value pairs in the self.kwargs dict, and then re-initialize the connection. """ raise NotImplementedError # self.__delete_process__() # self.__delete_flux__() self.kwargs.update(kwargs) self.__set__name__() # get name of connection self.__init_connection__(self.kwargs)
[docs] def get_species(self, r1, r2) -> None: """Set the species by r2. However, if we have backward fluxes the species depends on the r2 """ from esbmtk import Source self.r = r1 if isinstance(self.r1, Source) else r2 self.sp = self.kwargs.get("species", self.r.sp)
def __create_flux__(self) -> None: """Create flux object. Register with reservoir and global namespace """ from esbmtk import Flux, GasReservoir, Sink, Source, Species # test if default arguments present d = 0 if self.delta == "None" else self.delta r = f"0 {self.sp.mo.f_unit}" if self.rate == "None" else self.rate num = [""] if isinstance(self.source, Source): isotopes = self.sink.isotopes else: isotopes = self.source.isotopes if self.model.debug: logging.info(f"cf: {self.full_name}, isotopes = {self.isotopes}") if isotopes: num.append("_l") for e in num: self.fh = Flux( species=self.sp, # SpeciesProperties handle delta=d, # delta value of flux rate=r, # flux value plot=self.plot, # display this flux? register=self, # is this part of a group? isotopes=isotopes, id=f"{self.id}{e}", ) if self.model.debug: logging.info( f"cf: created {self.fh.full_name}, isotopes = {self.isotopes}" ) # register flux with its reservoirs if isinstance(self.r1, Source): # add the flux name direction/pair if isinstance(self.r2, Species | GasReservoir): self.r2.lio[self.fh] = self.influx # add the handle to the list of fluxes self.r2.lof.append(self.fh) # register flux and element in the reservoir. self.__register_species__(self.r2, self.r1.sp) if self.model.debug: logging.info( f"cf: registered {self.fh.full_name} with {self.r2.full_name}" ) elif isinstance(self.r2, Sink): # add the flux name direction/pair self.r1.lio[self.fh] = self.outflux # add flux to the upstream reservoir self.r1.lof.append(self.fh) # register flux and element in the reservoir. self.__register_species__(self.r1, self.r2.sp) if self.model.debug: logging.info( f"cf: registered {self.fh.full_name} with {self.r1.full_name}" ) elif isinstance(self.r1, Sink): raise Species2SpeciesError( "The Sink must be specified as a destination (i.e., as second argument" ) elif isinstance(self.r2, Source): raise Species2SpeciesError( "The Source must be specified as first argument" ) else: # add the flux name direction/pair self.r1.lio[self.fh] = self.outflux self.r2.lio[self.fh] = self.influx self.r1.lof.append(self.fh) # add flux to the upstream reservoir self.r2.lof.append(self.fh) # add flux to the downstream reservoir self.__register_species__(self.r1, self.r1.sp) self.__register_species__(self.r2, self.r2.sp) if self.model.debug: logging.info( f"cf: registered {self.fh.full_name} with {self.r1.full_name}\n" f"cf: registered {self.fh.full_name} with {self.r2.full_name}\n" ) self.lof.append(self.fh) def __register_species__(self, r, sp) -> None: """Add flux to the correct element dictionary.""" if sp.eh in r.doe: # test if element key is present in reservoir r.doe[sp.eh].append(self.fh) # add flux handle to dictionary list else: # add key and first list value r.doe[sp.eh] = [self.fh] def __set_process_type__(self) -> None: """Deduce flux type based on the provided flux properties. The method calls the appropriate method init routine """ from esbmtk import ( Sink, Source, ) self.r = self.r2 if isinstance(self.r1, Source) else self.r1 # if signal is provided but rate is omitted if self.signal != "None" and self.rate == "None": self._rate = 0 # if connection type is not set explicitly if ( self.ctype == "None" or self.ctype.casefold() == "regular" or self.ctype.casefold() == "fixed" ): self.ctype = "regular" if self.delta == "None" and self.epsilon == "None" and self.isotopes: self._epsilon = 0 if self.rate == "None": raise ConnectionError( "fixed/regular connections require the 'rate' keyword instead of 'scale'" ) elif self.ctype == "ignore": pass elif self.ctype == "scale_with_flux": self.__scaleflux__() elif self.ctype == "weathering": self.__weathering__() elif self.ctype == "gasexchange": self.__gasexchange__() elif self.ctype == "scale_with_concentration": self.__rateconstant__() elif self.ctype != "manual": logging.info(f"Species2Species Type {self.ctype} is unknown") raise Species2SpeciesError(f"Unknown connection type {self.ctype}") # check if flux should bypass any reservoirs for f in self.lof: if self.bypass == "source" and not isinstance(self.source, Source): self.source.lif.append(f) logging.info(f"bypassing {f.full_name} in {self.source.full_name}") elif self.bypass == "sink" and not isinstance(self.sink, Sink): self.sink.lif.append(f) logging.info(f"bypassing {f.full_name} in {self.sink.full_name}") def __scaleflux__(self) -> None: """Scale a flux relative to another flux.""" from esbmtk import Flux if self.model.debug: logging.info(f"sf: {self.full_name}, isotopes = {self.isotopes}") if isinstance(self.ref_flux, str): f = self.mo.flux_summary(filter_by=self.ref_flux, return_list=True)[0] self.ref_flux = f if not isinstance(self.ref_flux, Flux): raise ScaleFluxError( f"\n {self.ref_flux} must be flux or a an id-string. Check spelling\n" ) self.ref_flux.serves_as_input = True if self.isotopes == "None": raise ScaleFluxError(f"{self.name}: You need to set the isotope keyword") if self.k_value != "None": self.scale = self.k_value logging.warning( "\n Warning: use scale instead of k_value for scaleflux type\n" ) def __weathering__(self): """Initialize weathering function.""" from esbmtk import init_weathering, register_return_values self.isotopes = self.sink.isotopes ec = init_weathering( self, # connection object self.reservoir_ref, # current pCO2 self.pco2_0, # reference pCO2 self.scale, # area fraction self.ex, # exponent self.rate, # initial flux ) register_return_values(ec, self.sink) def __gasexchange__(self): """Initialize gas exachange.""" from esbmtk.processes import init_gas_exchange from esbmtk.utility_functions import register_return_values ec = init_gas_exchange(self) register_return_values(ec, self.sink) def __rateconstant__(self) -> None: """Add rate constant type process.""" if self.ctype == "scale_with_concentration": self.scale = map_units( self, self.scale, self.mo.c_unit, self.mo.f_unit, self.mo.r_unit, self.mo.v_unit, )
[docs] def info(self, **kwargs) -> None: """Show an overview of the object properties. Optional arguments are index :int = 0 this will show data at the given index indent :int = 0 indentation """ index = kwargs.get("index", 0) if "indent" not in kwargs: indent = 0 ind = "" else: indent = kwargs["indent"] ind = " " * indent # print basic data bout this Species2Species print(f"{ind}{self.__str__(kwargs)}") print(f"{ind}Fluxes:") for f in sorted(self.lof): f.info(indent=indent, index=index)
# ---- Property definitions to allow for connection updates -------- """ Changing the below properties requires that we delete all associated objects (processes), and determines the new flux type, and initialize/register these with the connection and model. We also have to update the keyword arguments as these are used for the log entry """ # ---- epsilon ---- @property def epsilon(self) -> float | int: """Epsilon property.""" return self._epsilon @epsilon.setter def epsilon(self, a: float | int) -> None: """Epsilon Setter.""" self._epsilon = a if self.init_done and a != "None": self.__set_process_type__() # derive flux type and create flux(es) self.__update_ode_constants__(self._epsilon, "epsilon") # ---- rate ---- @property def rate(self) -> float | int: """Rate property.""" return self._rate @rate.setter def rate(self, r: str) -> None: """Rate Setter.""" from . import Q_ if r == "None": self._rate = "None" elif isinstance(r, str): r = Q_(r) self._rate = Q_(r).to(self.model.f_unit).magnitude elif isinstance(r, Q_): self._rate = Q_(r).to(self.model.f_unit).magnitude else: self._rate = r if self.init_done and r != "None": self.__update_ode_constants__(self._rate, "rate") # ---- delta ---- @property def delta(self) -> float | int: """Delta property.""" return self._delta @delta.setter def delta(self, d: float | int) -> None: """Delta Setter.""" self._delta = d if self.init_done and d != "None": self.__update_ode_constants__(self._delta, "delta") @property def scale(self) -> float | int | Q_: """Scale property.""" return self._scale @scale.setter def scale(self, s: float | int | str | Q_) -> None: """Scale Setter.""" from . import Q_ if s == "None": s = 1.0 if isinstance(s, str): s = Q_(s) if isinstance(s, Q_): # test what type of Quantity we have if s.check("[volume]/[time]"): # flux self._scale = s.to(self.mo.r_unit).magnitude # test if flux elif s.check("[mass]/[time]") or s.check("[substance]/[time]"): self._scale = s.to(self.mo.f_unit).magnitude elif s.check("[mass]/[volume]"): # concentration self._scale = s.to(self.mo.c_unit).magnitude else: Species2SpeciesError(f"No conversion to model units for {s} specified") else: self._scale = s if self.init_done: # this is an update self.__update_ode_constants__(s, "scale")
[docs] class ConnectionProperties(esbmtkBase): """ConnectionProperties Class. Connect reservoir/sink/source groups when at least one of the arguments is a reservoirs_group object. This method will create regular connections for each matching species. Use the connection.update() method to fine tune connections after creation Example:: ConnectionProperties(source = upstream reservoir / upstream reservoir group sink = downstrean reservoir / downstream reservoirs_group delta = defaults to zero and has to be set manually epsilon = defaults to zero and has to be set manually rate = shared between all connections ref_reservoirs = shared between all connections ref_flux = shared between all connections species = list, optional, if present, only these species will be connected ctype = needs to be set for all connections. Use "Fixed" unless you require a specific connection type pl = [list]) process list. optional, shared between all connections id = optional identifier, passed on to individual connection plot = "yes/no" # defaults to yes, shared between all connections ) ConnectionProperties( source=OM_Weathering, sink=Ocean, rate={DIC: f"{OM_w} Tmol/yr" , ALK: f"{0} Tmol/yr"}, ctype = {DIC: "Fixed", ALK: "Fixed"}, ) """ def __init__(self, **kwargs) -> None: from esbmtk import ( Q_, Flux, GasReservoir, Model, Reservoir, Signal, SinkProperties, SourceProperties, Species, SpeciesProperties, ) self.defaults: dict[str, Any] = { "id": ["None", (str)], "source": [ "None", (str, SourceProperties, Species, Reservoir, GasReservoir), ], "sink": ["None", (str, SinkProperties, Species, Reservoir, GasReservoir)], "delta": ["None", (str, dict, tuple, int, float)], "rate": ["None", (Q_, str, dict, tuple, int, float)], "pl": ["None", (str, dict, tuple)], "signal": ["None", (str, Signal, dict)], "epsilon": ["None", (str, dict, tuple, int, float)], "species": ["None", (str, dict, tuple, list, SpeciesProperties)], "ctype": ["None", (str, dict, tuple)], "ref_reservoirs": ["None", (str, dict, tuple, Species)], "ref_flux": ["None", (str, dict, tuple, Flux)], "plot": ["yes", (str, dict, tuple)], "scale": [1, (str, dict, tuple, int, float, Q_)], "bypass": ["None", (dict, tuple, str)], "register": ["None", (str, tuple, Model)], "save_flux_data": [False, (bool, tuple)], "ref_species": ["None", (str, Species)], "water_vapor_pressure": ["None", (str, float)], "piston_velocity": ["None", (str, int, float)], "solubility": ["None", (str, int, float)], "area": ["None", (str, int, float, Q_)], "ex": [1, (int, float)], "pco2_0": ["280 ppm", (str, Q_)], } # provide a list of absolutely required keywords self.lrk: list = ["source", "sink", "ctype"] self.__initialize_keyword_variables__(kwargs) if self.register == "None": self.register = self.source.model # # self.source.lor is a list with the object names in the group self.mo = self.sink.lor[0].mo self.model = self.mo self.source_name = self.source.full_name self.sink_name = self.sink.full_name self.loc: list = [] # list of connection objects self.name = f"ConnGrp_{self.source.name}_to_{self.sink.name}_{self.id}" # fixme this results in duplicate names in the model namespace. # probably related to the create connection function # if self.id != "None": # self.name = f"{self.name}@{self.id}" self.base_name = self.name self.parent = self.register self.__register_with_parent__() self.__create_connections__()
[docs] def add_connections(self, **kwargs) -> None: """Add connections to the connection group.""" self.__initialize_keyword_variables__(kwargs) self.__create_connections__()
def __create_connections__(self) -> None: """Create Species2Species connection.""" from esbmtk import Reservoir, SinkProperties, SourceProperties self.connections: list = [] if isinstance(self.ctype, str): if isinstance(self.source, Reservoir | SinkProperties | SourceProperties): if self.species == "None": for s in self.source.lor: self.connections.append(s.species) else: for s in self.species: self.connections.append(s) elif isinstance(self.ctype, dict): # find all sub reservoirs which have been specified by the ctype keyword for r, _t in self.ctype.items(): self.connections.append(r) # now we need to create defaults for all connections self.c_defaults: dict = {} # connection dictionary with defaults # loop over species for sp in self.connections: # ["SO4", "H2S"] self.c_defaults[sp.n] = { # "cid": self.id, "cid": "None", "plot": "yes", "delta": "None", "epsilon": "None", "alpha": "None", "rate": "None", "scale": "None", "ctype": "None", "ref_reservoirs": "None", "ref_flux": "None", "bypass": "None", "signal": "None", } """loop over entries in defaults dict test if key in default dict is also specified as connection keyword test if rate in kwargs, if sp in rate dict """ for key, _value in self.c_defaults[sp.name].items(): if key in self.kwargs and isinstance(self.kwargs[key], dict): if key in self.kwargs and sp in self.kwargs[key]: self.c_defaults[sp.n][key] = self.kwargs[key][sp] elif key in self.kwargs and self.kwargs[key] != "None": # if value was supplied, update defaults dict self.c_defaults[sp.n][key] = self.kwargs[key] a = Species2Species( source=getattr(self.source, sp.n), sink=getattr(self.sink, sp.n), rate=self.c_defaults[sp.n]["rate"], delta=self.c_defaults[sp.n]["delta"], epsilon=self.c_defaults[sp.n]["epsilon"], plot=self.c_defaults[sp.n]["plot"], ctype=self.c_defaults[sp.n]["ctype"], scale=self.c_defaults[sp.n]["scale"], bypass=self.c_defaults[sp.n]["bypass"], signal=self.c_defaults[sp.n]["signal"], ref_reservoirs=self.c_defaults[sp.n]["ref_reservoirs"], ref_flux=self.c_defaults[sp.n]["ref_flux"], groupname=True, id=self.id, register=self.model, ) self.loc.append(a) # add connection to list of connections
[docs] def info(self) -> None: """List all connections in this group.""" print(f"Group Connect from {self.source.name} to {self.sink.name}\n") print("The following Species2Species are part of this group\n") print("You can query the details of each connection like this:\n") for c in self.loc: print(f"{c.name}: {self.name}.{c.name}.info()") print("")