Source code for esbmtk.extended_classes

"""esbmtk: A general purpose Earth Science box model toolkit.

Copyright (C), 2020 Ulrich G.  Wortmann

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or (at
your option) any later version.

This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""

from __future__ import annotations

import copy as cp
import logging
import math
import os
import typing as tp
import warnings

import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import pandas as pd
from pandas import DataFrame

if tp.TYPE_CHECKING:
    from .base_classes import Flux, Model, Species2Species

from .base_classes import Species, SpeciesBase, SpeciesProperties
from .esbmtk_base import esbmtkBase
from .utility_functions import get_delta, get_imass, get_l_mass

# declare numpy types
NDArrayFloat = npt.NDArray[np.float64]


[docs] class ReservoirError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Error Instance.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] class SourceSinkPropertiesError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Error Instance.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] class FluxError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Error Instance.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] class SignalError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Error Instance.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] class DataFieldError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Error Instance.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] class ESBMTKFunctionError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Error Instance.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] class ExternalDataError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Error Instance.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] class GasReservoirError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Error Instance.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] class CSVReadError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Error Instance.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] class CSVDataError(Exception): """Custom Error Class.""" def __init__(self, message): """Initialize Error Instance.""" message = f"\n\n{message}\n" super().__init__(message)
[docs] class Reservoir(esbmtkBase): """Create a group of reservoirs. which share a common volume, and potentially connections. E.g., if we have twoy reservoir groups with the same reservoirs, and we connect them with a flux, this flux will apply to all reservoirs in this group. A typical examples might be ocean water which comprises several species. A reservoir group like ShallowOcean will then contain sub-reservoirs like DIC in the form of ShallowOcean.DIC Example:: Reservoir(name = "ShallowOcean", # Name of reservoir group volume/geometry = "1E5 l", # see below delta = {DIC:0, TA:0, PO4:0] # dict of delta values mass/concentration = {DIC:"1 unit", TA: "1 unit"} plot = {DIC:"yes", TA:"yes"} defaults to yes isotopes = {DIC: True/False} see Species class for details seawater_parameters = dict, optional, see below register= model handle, required ) Notes: The subreservoirs are derived from the keys in the concentration or mass dictionary. Toward this end, the keys must be valid species handles and -- not species names -- ! Connecting two reservoir groups requires that the names in both group match, or that you specify a dictionary which delineates the matching. Most parameters are passed on to the Species class. See the reservoir class documentation for details The geometry keyword specifies the upper depth interval, the lower depth interval, and the fraction of the total ocean area inhabited by the reservoir If the geometry parameter is supplied, the following instance variables will be computed: - self.volume: in model units (usually liter) - self.area: surface area in m^2 at the upper bounding surface - self.sed_area: area of seafloor which is intercepted by this box. - self.area_fraction: area of seafloor which is intercepted by this relative to the total ocean floor area seawater_parameters: ~~~~~~~~~~~~~~~~~~~~ If this optional parameter is specified, a SeaWaterConstants instance will be registered for this Species as Species.swc See the SeaWaterConstants class for details how to specify the parameters, e.g.:: seawater_parameters = {"temperature": 2, "pressure": 240, "salinity" : 35, }, """ def __init__(self, **kwargs) -> None: """Initialize a new reservoir group.""" from esbmtk import ( Q_, Model, SeawaterConstants, SpeciesProperties, ) from esbmtk.sealevel import get_box_geometry_parameters from esbmtk.utility_functions import dict_alternatives # provide a dict of all known keywords and their type self.defaults: dict[str, list[any, tuple]] = { "name": ["None", (str)], "delta": ["None", (dict, str)], "concentration": ["None", (dict, str)], "mass": ["None", (str, dict)], "volume": ["None", (str, Q_)], "geometry": ["None", (str, list, dict)], "plot": ["None", (str, dict)], "isotopes": [False, (dict, bool)], "seawater_parameters": ["None", (dict, str)], "carbonate_system": [False, (bool)], "register": ["None", (str, Model)], } # provide a list of absolutely required keywords self.lrk: list = [ "name", ["volume", "geometry"], ] if "concentration" in kwargs: self.species: list = list(kwargs["concentration"].keys()) elif "mass" in kwargs: self.species: list = list(kwargs["mass"].keys()) else: raise ReservoirError("You must provide either mass or concentration") self.__initialize_keyword_variables__(kwargs) if self.register == "None": self.register = self.species[0].mo # legacy variable self.set_area_warning = False self.n = self.name self.mo = self.species[0].mo self.model = self.species[0].mo self.parent = self.register self.has_cs1 = False self.has_cs2 = False # geoemtry information if self.volume == "None": get_box_geometry_parameters(self) elif isinstance(self.volume, str): self.volume = Q_(self.volume).to(self.mo.v_unit) elif not isinstance(self.volume, Q_): raise ReservoirError("Volume must be string or quantity") self.__register_with_parent__() # if len(self.species) == 1: la = ["mass", "concentration", "delta", "plot", "isotopes"] for a in la: if not isinstance(getattr(self, a), dict): setattr(self, a, {self.species[0]: getattr(self, a)}) self.lor: list = [] # list of reservoirs in this group. # loop over all entries in species and create the respective reservoirs for s in self.species: if not isinstance(s, SpeciesProperties): raise ReservoirError(f"{s.n} needs to be a valid species name") rtype = "flux_only" if s.flux_only else "regular" a = Species( name=f"{s.name}", register=self, species=s, delta=self.delta.get(s, "None"), concentration=self.concentration.get(s, "0 mol/kg"), isotopes=self.isotopes.get(s, False), plot=self.plot.get(s, "None"), volume=self.volume, groupname=self.name, rtype=rtype, ) # register as part of this group self.lor.append(a) # register a seawater_parameter instance if necessary if self.seawater_parameters != "None": temp = dict_alternatives(self.seawater_parameters, "temperature", "T") sal = dict_alternatives(self.seawater_parameters, "salinity", "S") bar = dict_alternatives(self.seawater_parameters, "pressure", "P") if hasattr(self, "DIC") and hasattr(self, "DIC"): SeawaterConstants( name="swc", temperature=temp, pressure=bar, salinity=sal, register=self, ta=self.TA.c[0], dic=self.DIC.c[0], ) else: SeawaterConstants( name="swc", temperature=temp, pressure=bar, salinity=sal, register=self, ta=0.002, dic=0.002, ) warnings.warn( f"\n\nUsing SeawaterConstants without provinding DIC " f"and TA values for {self.name}\n\n", stacklevel=2, ) self.model.now = self.model.now + 1 self.register.lrg.append(self)
[docs] class SourceSinkProperties(esbmtkBase): """Create Source/Sink Groups. These are not actual reservoirs, but we stil need to have them as objects Example:: SinkProperties(name = "Pyrite", species = [SO42, H2S], ) where the first argument is a string, and the second is a reservoir handle """ def __init__(self, **kwargs) -> None: """Initialize Class Instance.""" from esbmtk import Model, Sink, Source, SpeciesProperties # provide a dict of all known keywords and their type self.defaults: dict[str, list[any, tuple]] = { "name": ["None", (str)], "species": ["None", (str, list)], "delta": [{}, dict], "isotopes": [False, (dict, bool)], "register": ["None", (str, Model)], } # provide a list of absolutely required keywords self.lrk: list[str] = ["name", "species"] self.__initialize_keyword_variables__(kwargs) # register this object self.mo = self.species[0].mo # get model handle self.model = self.mo if self.register == "None": self.register = self.mo # legacy variables self.n = self.name self.parent = self.register self.loc: set[Species2Species] = set() # set of connection objects self.__register_with_parent__() self.lor: list = [] # list of sub reservoirs in this group # input variables can be either dictionaries, single values if isinstance(self.species, list): la = ["delta", "isotopes"] for a in la: if not isinstance(getattr(self, a), dict): setattr(self, a, {self.species[0]: getattr(self, a)}) # loop over species names and setup sub-objects for _i, s in enumerate(self.species): if isinstance(s, str) and s != "None": raise ValueError(f"{s} need to be a species object, not a string") if not isinstance(s, SpeciesProperties): raise SourceSinkPropertiesError( f"{s.n} needs to be a valid species name" ) if type(self).__name__ == "SourceProperties": a = Source( name=f"{s.name}", register=self, species=s, delta=self.delta.get(s, "None"), ) elif type(self).__name__ == "SinkProperties": a = Sink( name=f"{s.name}", register=self, species=s, isotopes=self.isotopes.get(s, False), ) else: raise SourceSinkPropertiesError( f"{type(self).__name__} is not a valid class type" ) # register in list of reservoirs self.lor.append(a)
[docs] class SinkProperties(SourceSinkProperties): """A wrapper to setup a Sink object. Example:: SinkProperties(name = "Burial", species = [SO42, H2S], delta = {"SO4": 10} ) """
[docs] class SourceProperties(SourceSinkProperties): """A wrapper to setup a Source object. Example:: SourceProperties(name = "weathering", species = [SO42, H2S], delta = {"SO4": 10} ) """
[docs] class Signal(esbmtkBase): """Create a signal. which is described by its startime (relative to the model time), it's size (as mass) and duration, or as duration and magnitude. Furthermore, we can presribe the signal shape (square, pyramid, bell, file )and whether the signal will repeat. You can also specify whether the event will affect the delta value. Alternatively, signal can be read from a CSV file. The file needs to specify Time [unit], Flux [unit/time unit], delta value The delta value column is optional.The units must be of similar dimensions as the model dimensions (e.g., mol/l or mol/kg). Data will be automatically interpolated. Example:: Signal(name = "Name", species = SpeciesProperties handle, start = "0 yrs", # optional duration = "0 yrs", # reverse_time: [False, (bool)], # optional delta = 0, # optional stype = "addition" # or multiplication/epsilon_only shape = "square/pyramid/bell/filename" mass/magnitude/filename # give one offset = '0 yrs', # scale = 1, optional, # offset = option # reservoir = r-handle # optional, see below source = s-handle optional, see below display_precision = number, optional, inherited from Model register, ) Signals are cumulative, i.e., complex signals ar created by adding one signal to another (i.e., Snew = S1 + S2) The optional scaling argument will only affect the y-column data of external data files Signals are registered with a flux during flux creation, i.e., they are passed on the process list when calling the connector object. if the filename argument is used, you can provide a filename which contains the data to be used in csv format. The data will be interpolated to the model domain, and added to the already existing data. The external data need to be in the following format Time, Rate, delta value 0, 10, 12 i.e., the first row needs to be a header line All time data in the csv file will be treated as realative time (i.e., the start time will be mapped to zero). The reverse keyword can be used to invert a signal that is read from a csv file. Last but not least, you can provide an optional reservoir name. In this case, the signal will create a source as (signal_name_source) and the connection to the specified reservoir. If you build a complex signal do this as the last step. If you additionally provide a source name the connection will be made between the provided source (this can be useful if you use source groups). This class has the following methods Signal.repeat() Signal.plot() Signal.info() The signal class provides the following data fields self.data.m which contains the interpolated signal also available as self.m self.data.l which contain the interpolated isotope data in the form of the light isotope also availavle as self.l If no isotope data is given, it is 0 self.ed is the object reference for the externaldata instance in cases wher data is read from a csv file Isotopes: Delta values are always additive, regardless of signal type. The epsilon_only type ignores are flux values (the column must be present though), and applies the delta value to a given flux. This setting is most useful to dynamically control the fractionation factor of an output flux. """ def __init__(self, **kwargs) -> None: """Parse and initialize variables.""" from esbmtk import Q_, Model, Sink, Source, Species, SpeciesProperties # provide a list of all known keywords and their type self.defaults: dict[str, list[any, tuple]] = { "name": ["None", (str)], "start": ["0 yrs", (str, Q_)], "duration": ["None", (str, Q_)], "species": ["None", (SpeciesProperties)], "delta": [0, (int, float)], "stype": ["addition", (str)], "reverse_time": [False, (bool)], "shape": ["None", (str)], "filename": ["None", (str)], "mass": ["None", (str, Q_)], "magnitude": ["None", (str, Q_)], "offset": ["0 yrs", (str, Q_)], "plot": ["no", (str)], "scale": [1, (int, float)], "display_precision": [0.01, (int, float)], "reservoir": ["None", (Source, Sink, Species, str)], "source": ["None", (Source, Sink, Species, str)], "legend_right": ["None", (str)], "register": ["None", (str, Model)], "isotopes": [False, (bool)], } # provide a list of absolutely required keywords self.lrk: list[str] = [ "name", ["duration", "filename"], "species", ["shape", "filename"], ["magnitude", "mass", "filename"], ] self.__initialize_keyword_variables__(kwargs) # list of signals we are based on self.los: list[Signal] = [] # convert units to model units self.st: int | float = int( Q_(self.start).to(self.species.mo.t_unit).magnitude ) # start time if "mass" in self.kwargs: self.mass = Q_(self.mass).to(self.species.mo.m_unit).magnitude elif "magnitude" in self.kwargs: self.magnitude = Q_(self.magnitude).to(self.species.mo.f_unit).magnitude if "duration" in self.kwargs: self.duration = int(Q_(self.duration).to(self.species.mo.t_unit).magnitude) if self.duration / self.species.mo.dt < 10: warnings.warn( """\n\n Your signal duration is covered by less than 10 Intergration steps. This may not be what you want Consider adjusting the max_step parameter of the model object\n""", stacklevel=2, ) self.model.now = self.model.now + 1 self.offset = Q_(self.offset).to(self.species.mo.t_unit).magnitude # legacy name definitions self.full_name = "" self.l: int = self.duration self.n: str = self.name # the name of the this signal self.sp: SpeciesProperties = self.species # the species self.mo: Model = self.species.mo # the model handle self.model = self.mo if self.register == "None": self.register = self.mo self.parent = self.register self.ty: str = self.stype # type of signal self.sh: str = self.shape # shape the event self.d: float = self.delta # delta value offset during the event self.kwd: dict[str, any] = self.kwargs # list of keywords self.led: list = [] if self.delta != "None": self.isotopes = True if self.display_precision == 0: self.display_precision = self.mo.display_precision self.s_m, self.s_l, self.s_d = self.__init_signal_data__() self.signal_data = self.__map_signal__() """ self.__map_signal__() returns a Flux object, so we need to remove this from the list of model Fluxes, with self.mo.lof.remove(self.data) since we do not use is as a Flux. Probably better to just create a vector object instead. FIXME sometime """ self.mo.lof.remove(self.signal_data) self.m = self.signal_data.m self.cd_m = self.m.copy() if self.isotopes: self.l = self.signal_data.l self.d = self.signal_data.d self.cd_l = self.l.copy() self.cd_d = self.d.copy() self.signal_data.n: str = ( self.name + "_data" ) # update the name of the signal data self.legend_left = self.signal_data.legend_left self.legend_right = self.signal_data.legend_right # update isotope values # self.data.l = get_l_mass(self.data.m, self.data.d, self.sp.r) self.__register_with_parent__() self.mo.los.append(self) # register with model # in case we deal with a sink or source signal if self.reservoir != "None": self.__apply_signal__() def __init_signal_data__(self) -> None: """1. Create a vector which contains the signal data. The vector length can exceed the modelling domain. 2. Trim the signal vector in such a way that it fits within the modelling domain 3. Create an empty flux and replace it with the signal vector data. Note that this flux will then be added to an existing flux. """ # these are signal times, not model time if self.duration != "None": self.length: int = int(round(self.duration / self.mo.dt)) # create signal vector if self.sh == "square": self.__square__(0, self.length) elif self.sh == "pyramid": self.__pyramid__(0, self.length) elif self.sh == "bell": self.__bell__(0, self.length) elif "filename" in self.kwargs: # use an external data set self.length = self.__int_ext_data__() else: raise SignalError( "argument needs to be either square/pyramid, or an ExternalData object. " ) return self.s_m, self.s_l, self.s_d def __map_signal__(self) -> Flux: """Map signal to model domain. s.t. signal data fits the time grid of model. Returns mapped data. """ from esbmtk import Flux # Create a dummy flux we can act up. Flux masses are all zero mapped_signal_data: Flux = Flux( name=self.n + "_data", species=self.sp, rate=f"0 {self.sp.mo.f_unit}", delta=0, isotopes=self.isotopes, save_flux_data=True, register=self, ) # Creating signal time array dt = self.mo.dt # model time step signal_start = self.st signal_end = self.st + self.duration if self.filename == "None" else self.et model_time = self.mo.time # model time array # imitate signal time array with same size as interpolated signal data (self.s_m) signal_time = np.arange(signal_start, signal_end, dt) """ Create empty arrays that are NaN for data we do not touch, zero for data that is treated as additive, and one for data that is multiplied. """ mapped_time = np.full_like(model_time, np.nan, dtype=float) if self.stype == "addition" or self.stype == "epsilon_only": mapped_m = np.full_like(model_time, 0, dtype=float) elif self.stype == "multiplication": mapped_m = np.full_like(model_time, 1, dtype=float) else: # in case something is wrong with stype, still create mapped data raise SignalError( f"\nYou need to specify the signal type. Allowed keywords are:\n" f"addition/mutiplication/epsilon_only\n" f"\n{self.stype} is not defined. Typo?\n" ) # isotopes are always additive mapped_l = np.full_like(model_time, 0, dtype=float) mapped_d = np.full_like(model_time, 0, dtype=float) # Filter signal time which exists in model time # If signal time in model time, return True in mask # Every time element in model time flagged with True is # collected in mapped_time (array is only used within this method) mask = np.isin(model_time, signal_time) mapped_time[mask] = model_time[mask] # Go through mapped_time to check where there was a match between model # and signal times. Collect signal data for where times matched for i, t in enumerate(mapped_time): if t >= 0: signal_index = np.searchsorted(signal_time, t) mapped_m[i] = self.s_m[signal_index] if self.isotopes: mapped_l[i] = self.s_l[ signal_index ] # TODO: for future thinking how to calculate isotope fluxes mapped_d[i] = self.s_d[ signal_index ] # TODO: for future thinking how to calculate isotope fluxes if self.reverse_time: mapped_signal_data.m = np.flip(mapped_m) mapped_signal_data.l = np.flip(mapped_l) mapped_signal_data.d = np.flip(mapped_d) else: mapped_signal_data.m = mapped_m mapped_signal_data.l = mapped_l mapped_signal_data.d = mapped_d return mapped_signal_data def __square__(self, s, e) -> None: """Create Square Signal.""" self.s_m: NDArrayFloat = np.zeros(e - s) self.s_d: NDArrayFloat = np.zeros(e - s) if "mass" in self.kwd: h = self.mass / self.duration # get the height of the square self.magnitude = h elif "magnitude" in self.kwd: h = self.magnitude self.mass = h * self.duration else: raise SignalError("You must specify mass or magnitude of the signal") self.s_m = self.s_m + h # add this to the section self.s_d = self.s_d + self.d # add the delta offset self.s_l = get_l_mass(self.s_m, self.s_d, self.sp.r) def __pyramid__(self, s, e) -> None: """Create pyramid type Signal. s = start index e = end index """ if "mass" in self.kwd: h = 2 * self.mass / self.duration # get the height of the pyramid elif "magnitude" in self.kwd: h = self.magnitude else: raise SignalError("You must specify mass or magnitude of the signal") # create pyramid c: int = int(round((e - s) / 2)) # get the center index for the peak x: NDArrayFloat = np.array([0, c, e - s]) # setup the x coordinates y: NDArrayFloat = np.array([0, h, 0]) # setup the y coordinates d: NDArrayFloat = np.array([0, self.d, 0]) # setup the d coordinates xi = np.arange(0, e - s) # setup the points at which to interpolate self.s_m: NDArrayFloat = np.interp(xi, x, y) # interpolate flux self.s_d: NDArrayFloat = np.interp(xi, x, d) # interpolate delta self.s_l = get_l_mass(self.s_m, self.s_d, self.sp.r) def __bell__(self, s, e) -> None: """Create a bell curve type signal. s = start index e = end index Note that the area under the curve equals one. So we can scale the result simply with mass """ import sys c: int = int(round((e - s) / 2)) # get the center index for the peak x: NDArrayFloat = np.arange(-c, c + 1, 1) e: float = math.e pi: float = math.pi mu: float = 0 phi: float = c / 4 logging.debug(f"mu = {mu} ,phi = {phi}") logging.debug(f"x[0] = {x[0]}, x[-1] = {x[-1]}") logging.debug(sys.float_info) a = -((x - mu) ** 2) / (2 * phi**2) # get bell curve self.s_m = 1 / (phi * math.sqrt(2 * pi)) * e**a self.s_d = self.s_m * self.delta / max(self.s_m) self.s_l = self.s_m if "mass" in self.kwargs: self.s_m = self.s_m * self.mass elif "magnitude" in self.kwargs: self.s_m = self.s_m * self.magnitude / max(self.s_m) else: raise SignalError("Bell type signal require either mass or magnitude") def __int_ext_data__(self) -> None: """Interpolate External data as a signal. Unlike the other signals, this will replace the values in the flux with those read from the external data source. The external data need to be in the following format Time [units], Rate [units], delta value [units] 0, 10, 12 i.e., the first row needs to be a header line """ self.ed = ExternalData( name=f"{self.name}_ed", filename=self.filename, register=self, legend=f"{self.name}_ed", disp_units=False, # we need the data in model units ) self.isotopes = self.ed.isotopes self.s_time = self.ed.x self.s_data = self.ed.y * self.scale self.st: float = self.s_time[0] # signal start time self.et: float = self.s_time[-1] # signal end time signal_duration = self.et - self.st model_time_step = self.mo.dt # Calculate how many data points are needed to interpolate signal duration with model time step num_steps = int(signal_duration / model_time_step) # setup the points at which to interpolate xi = np.linspace(self.st, self.et, num_steps + 1) self.s_m: NDArrayFloat = np.interp( xi, self.s_time, self.s_data ) # interpolate flux if self.ed.zh: self.s_delta = self.ed.d self.s_d: NDArrayFloat = np.interp(xi, self.s_time, self.s_delta) self.s_l = get_l_mass(self.s_m, self.s_d, self.sp.r) else: self.s_l: NDArrayFloat = np.zeros(num_steps) self.s_d: NDArrayFloat = np.zeros(num_steps) return num_steps def __apply_signal__(self) -> None: """Create a source, and connect signal, source and reservoir. Maybe this logic should be me moved elsewhere? """ from esbmtk import Source, Species2Species raise DeprecationWarning( "This method is no longer supported. Please notify the esbmtk author" ) self.model.now = self.model.now + 1 if self.source == "None": self.source = Source(name=f"{self.name}_Source", species=self.sp) Species2Species( source=self.source, # source of flux sink=self.reservoir, # target of flux rate="0 mol/yr", # flux rate signal=self, # list of processes plot="no", ) def __add__(self, other): """Allow the addition of two signals and return a new signal. FIXME: this requires cleanup """ new_signal = cp.deepcopy(self) new_signal.m = self.m + other.m # get delta of self if self.isotopes: this_delta = get_delta(self.l, self.m - self.l, self.signal_data.sp.r) other_delta = get_delta(other.l, other.m - other.l, other.data.sp.r) new_signal.l = get_l_mass( new_signal.m, this_delta + other_delta, new_signal.signal_data.sp.r ) # new_signal.l = max(self.l, other.l) new_signal.name: str = self.name + "_and_" + other.name # print(f"adding {self.n} to {other.n}, returning {ns.n}") # new_signal.data.n: str = self.n + "_and_" + other.n + "_data" new_signal.st = min(self.st, other.st) new_signal.sh = "compound" new_signal.los.append(self) new_signal.los.append(other) return new_signal
[docs] def repeat(self, start, stop, offset, times) -> None: """Create a new signal by repeating an existing signal. Example:: new_signal = signal.repeat(start, # start time of signal slice to be repeated stop, # end time of signal slice to be repeated offset, # offset between repetitions times, # number of time to repeat the slice ) """ ns: Signal = cp.deepcopy(self) ns.n: str = self.n + f"_repeated_{times}_times" ns.signal_data.n: str = self.n + f"_repeated_{times}_times_data" start: int = int(start / self.mo.dt) # convert from time to index stop: int = int(stop / self.mo.dt) offset: int = int(offset / self.mo.dt) ns.start: float = start ns.stop: float = stop ns.offset: float = stop - start + offset ns.times: float = times ns.ms: NDArrayFloat = self.signal_data.m[ start:stop ] # get the data slice we are using ns.ds: NDArrayFloat = self.signal_data.d[start:stop] diff = 0 for _ in range(times): start += ns.offset stop += ns.offset if start > len(self.signal_data.m): break elif stop > len(self.signal_data.m): # end index larger than data size diff: int = stop - len(self.signal_data.m) # difference stop -= diff lds: int = len(ns.ds) - diff else: lds: int = len(ns.ds) ns.signal_data.m[start:stop]: NDArrayFloat = ( ns.signal_data.m[start:stop] + ns.ms[:lds] ) ns.signal_data.d[start:stop]: NDArrayFloat = ( ns.signal_data.d[start:stop] + ns.ds[:lds] ) # and recalculate li and hi ns.signal_data.l: NDArrayFloat ns.signal_data.h: NDArrayFloat [ns.signal_data.l, ns.signal_data.h] = get_imass( ns.signal_data.m, ns.signal_data.d, ns.signal_data.sp.r ) return ns
def __register_with_flux__(self, flux) -> None: """Register this signal with a flux. This should probably be done through a process! """ self.fo: Flux = flux # the flux handle self.sp: SpeciesProperties = flux.sp # the species handle # list of processes flux.lop.append(self) def __call__(self, t) -> list[float, float]: """Return Signal value at time t. (mass and mass for light isotope). This will work as long a t is a multiple of dt, and i = t. may extend this by addding linear interpolation but that will be costly """ import numpy as np m = np.interp(t, self.mo.time, self.cd_m) if self.isotopes: lm = np.interp(t, self.mo.time, self.cd_l) ld = np.interp(t, self.mo.time, self.cd_d) else: lm = 0 ld = 0 return [m, lm, ld] def __plot__(self, M: Model, ax) -> None: """Plot instructions. M: Model ax: matplotlib axes handle """ from esbmtk import set_y_limits # convert time and data to display units x = (M.time * M.t_unit).to(M.d_unit).magnitude y1 = (self.signal_data.m * M.f_unit).to(self.signal_data.plt_units).magnitude legend = f"{self.legend_left} [{M.f_unit}]" # # test for plt_transform # if self.plot_transform_c != "None": # if Callable(self.plot_transform_c): # y1 = self.plot_transform_c(self.c) # else: # raise ValueError("Plot transform must be a function") # Best to plot external data on a secondary x-axis # plot first axis _ln1 = ax.plot(x, y1, color="C0", label=self.legend_left) ax.set_xlabel(f"{M.time_label} [{M.d_unit:~P}]") ax.set_ylabel(legend) ax.spines["top"].set_visible(False) handler1, label1 = ax.get_legend_handles_labels() ax.set_title(self.full_name) self.mo.axd[ax] = "reversible" # M0 # plot second axis if self.isotopes: axt = ax.twinx() y2 = self.signal_data.d # no conversion for isotopes _ln2 = axt.plot(x, y2, color="C1", label=self.legend_right) axt.set_ylabel(self.signal_data.ld) set_y_limits(axt, M) ax.spines["top"].set_visible(False) # set combined legend handler2, label2 = axt.get_legend_handles_labels() legend = axt.legend(handler1 + handler2, label1 + label2, loc=0).set_zorder( 6 ) else: ax.legend() ax.spines["right"].set_visible(False) ax.yaxis.set_ticks_position("left") ax.xaxis.set_ticks_position("bottom") def __write_data__( self, prefix: str, start: int, stop: int, stride: int, append: bool, directory: str, ) -> None: """Write data to file. This function is called by the write_data() and save_state() methods :param prefix: :param start: :param stop: :param stride: :param append: :param directory: """ from pathlib import Path p = Path(directory) p.mkdir(parents=True, exist_ok=True) # some short hands sn = self.sp.n # species name sp = self.sp # species handle mo = self.sp.mo # model handle _smu = f"{mo.m_unit:~P}" mtu = f"{mo.t_unit:~P}" _fmu = f"{mo.f_unit:~P}" cmu = f"{mo.c_unit:~P}" rn = self.full_name # reservoir name _mn = self.sp.mo.n # model name fn = f"{directory}/{prefix}{rn}.csv" # file name # build the dataframe df: pd.dataframe = DataFrame() # breakpoint() df[f"{rn} Time [{mtu}]"] = self.mo.time[start:stop:stride] # time # df[f"{rn} {sn} [{smu}]"] = self.m.to(self.mo.m_unit).magnitude[start:stop:stride] # mass if self.isotopes: # print(f"rn = {rn}, sp = {sp.name}") # light isotope df[f"{rn} {sp.ln} [{cmu}]"] = self.l[start:stop:stride] df[f"{rn} {sn} [{cmu}]"] = self.signal_data.m[ start:stop:stride ] # concentration file_path = Path(fn) if append and file_path.exists(): df.to_csv(file_path, header=False, mode="a", index=False) else: df.to_csv(file_path, header=True, mode="w", index=False) return df def __init_signal_function__(self): """Initialize a an external code instance that can be called by the solver.""" raise NotImplementedError("not yet implemented") if self.isotopes: p = (self.mo.time, self.m, self.l) function_name = "signal_with_istopes" else: p = (self.mo.time, self.m) function_name = "signal_no_istopes" ec = ExternalCode( name=f"ec_signal_{self.name}", fname=function_name, ftype="std", species=self.species, function_input_data=["t"], function_params=p, register=self.model, return_values=[ {"N_one": "None"}, ], ) self.mo.lpc_f.append(ec.fname) return ec
[docs] class VectorData(esbmtkBase): """A simple container for 1-dimensional data. Typically used for results obtained by postprocessing. """ def __init__(self, **kwargs: dict[str, any]) -> None: """Initialize Class.""" from pint import Unit self.defaults: dict[str, list(str, tuple)] = { "name": ["None", (str,)], "register": ["None", (str, Reservoir)], "species": ["None", (str, SpeciesProperties)], "data": ["None", (str, np.ndarray, float)], "isotopes": [False, (bool,)], "plt_units": ["None", (str, Unit)], "label": ["None", (str, bool)], } # provide a list of absolutely required keywords self.lrk: list = [ "name", "register", "species", "data", "label", "plt_units", ] self.__initialize_keyword_variables__(kwargs) self.n = self.name self.parent = self.register self.sp = self.species self.mo = self.species.mo self.model = self.species.mo self.c = self.data self.x = self.c self.label = self.name self.register.model.lvd.append(self) self.__register_with_parent__() def __write_data__( self, prefix: str, start: int, stop: int, stride: int, append: bool, directory: str, ) -> None: """Write data to file. This function is called by the write_data() and save_state() methods :param prefix: :param start: :param stop: :param stride: :param append: :param directory: """ from pathlib import Path from esbmtk import SpeciesError p = Path(directory) p.mkdir(parents=True, exist_ok=True) # some short hands sn = self.sp.n # species name sp = self.sp # species handle mo = self.sp.mo # model handle _smu = f"{mo.m_unit:~P}" mtu = f"{mo.t_unit:~P}" _fmu = f"{mo.f_unit:~P}" cmu = f"{mo.c_unit:~P}" # sdn = self.sp.dn # delta name # sds = self.sp.ds # delta scale rn = self.full_name # reservoir name mn = self.sp.mo.n # model name if self.sp.mo.register == "None": fn = f"{directory}/{prefix}{mn}_{rn}.csv" # file name elif self.sp.mo.register == "local": fn = f"{directory}/{prefix}{rn}.csv" # file name else: raise SpeciesError( "Model register keyword must be 'None' or 'local' " f" not {self.sp.mo.register}" ) df: pd.dataframe = DataFrame() df[f"{rn} Time [{mtu}]"] = self.mo.time[start:stop:stride] # time # df[f"{rn} {sn} [{smu}]"] = self.m.to(self.mo.m_unit).magnitude[start:stop:stride] # mass if self.isotopes: # print(f"rn = {rn}, sp = {sp.name}") # light isotope df[f"{rn} {sp.ln} [{cmu}]"] = self.l[start:stop:stride] df[f"{rn} {sn} [{cmu}]"] = self.c[start:stop:stride] # concentration file_path = Path(fn) if append and file_path.exists(): df.to_csv(file_path, header=False, mode="a", index=False) else: df.to_csv(file_path, header=True, mode="w", index=False) return df
[docs] def get_conversion_unit_information(self): """Return concentrat data in plot units.""" from pint import Unit from esbmtk import Q_ if isinstance(self.plt_units, Q_): unit = f"{self.plt_units.units:~P}" elif isinstance(self.plt_units, Unit): unit = f"{self.plt_units:~P}" else: unit = f"{self.plt_units}" return self.c, unit, 1
[docs] class DataField(esbmtkBase): """DataField Class. Datafields can be used to plot data which is computed after the model finishes in the overview plot windows. Therefore, datafields will plot in the same window as the reservoir they are associated with. Datafields must share the same x-axis is the model, and can have up to two y axis. Example:: DataField(name = "Name" register = Model handle, x1_data = ["None", (np.ndarray, list)], defaults to model time y1_data = NDArrayFloat or list of arrays y1_label = Data label(s) y1_legend = Y-Axis Label y1_type = "plot", | "scatter" y2_data = NDArrayFloat # optional y2_legend = Y-Axis label # optional y2_label = Data legend(s) # optional y2_type = "plot", | "scatter" common_y_scale = "no", #optional, default "no" display_precision = number, optional, inherited from Model ) All y1 and y2 keywords can be either a single value or a list of values. Note that Datafield data is not mapped to model units. Care must be taken that the data units match the model units. The instance provides the following data Name.x = X-axis = model X-axis Name.y1_data Name.y1_label Name.y1_legend Similarly for y2 You can specify more than one data set, and be explicit about color and linestyle choices. Example:: DataField( name="df_pH", x1_data=[M.time, M.time, M.time, M.ef_hplus_l.x, M.ef_hplus_h.x, M.ef_hplus_d.x], y1_data=[ -np.log10(M.L_b.Hplus.c), -np.log10(M.H_b.Hplus.c), -np.log10(M.D_b.Hplus.c), -np.log10(M.ef_hplus_l.y), -np.log10(M.ef_hplus_h.y), -np.log10(M.ef_hplus_d.y), ], y1_label="Low latitude, High latitude, Deep box, d_L, d_H, d_D".split( ", "), y1_color="C0 C1 C2 C0 C1 C2".split(" "), y1_style="solid solid solid dotted dotted dotted".split( " "), y1_legend="pH", register=M, ) """ def __init__(self, **kwargs: dict[str, any]) -> None: """Initialize this instance.""" from . import ExternalCode, Model, SpeciesNoSet, VirtualSpecies # dict of all known keywords and their type self.defaults: dict[str, list(str, tuple)] = { "name": ["None", (str)], "register": [ "None", ( Model, Species, Reservoir, SpeciesNoSet, VirtualSpecies, ExternalCode, ), ], "associated_with": [ "None", ( Model, Species, Reservoir, SpeciesNoSet, VirtualSpecies, ExternalCode, ), ], "y1_data": ["None", (np.ndarray, list)], "x1_data": ["None", (np.ndarray, list, str)], "x1_as_time": [False, (bool)], "y1_label": ["Not Provided", (str, list)], "y1_legend": ["Not Provided", (str)], "y1_type": ["plot", (str, list)], "y1_color": ["None", (str, list)], "y1_style": ["None", (str, list)], "y2_data": ["None", (str, np.ndarray, list)], "x2_data": ["None", (np.ndarray, list, str)], "x2_as_time": [False, (bool)], "y2_label": ["Not Provided", (str, list)], "y2_legend": ["Not Provided", (str)], "y2_type": ["plot", (str, list)], "y2_color": ["None", (str, list)], "y2_style": ["None", (str, list)], "common_y_scale": ["no", (str)], "display_precision": [0.01, (int, float)], "title": ["None", (str)], } # provide a list of absolutely required keywords self.lrk: list = ["name", ["register", "associated_with"], "y1_data"] # provide a dictionary entry for a keyword specific error message # see esbmtkBase.__initerrormessages__() self.__initialize_keyword_variables__(kwargs) if self.register == "None": self.register = self.associated_with # set legacy variables self.legend_left = self.y1_legend self.isotopes = False self.parent = self.register # if self.associated_with == "None": # self.associated_with = self.mo.lor[0] # self.mo = self.associated_with.mo self.mo = self.register.mo self.model = self.mo if self.associated_with == "None": if isinstance(self.register, Model): self.associated_with = self.register.lor[0] elif isinstance(self.register, Species): self.associated_with = self.register else: raise DataFieldError( "Set associated_with or register to a reservoir name" ) if isinstance(self.associated_with, Species): self.plt_units = self.associated_with.plt_units elif isinstance(self.associated_with, Reservoir): self.plt_units = self.associated_with.lor[0].plt_units else: raise ValueError("This needs fixing") if self.y1_color == "None": self.y1_color = [] for i, _d in enumerate(self.y1_data): self.y1_color.append(f"C{i}") if self.y1_style == "None": self.y1_style = [] for _i, _d in enumerate(self.y1_data): self.y1_style.append("solid") if self.y2_color == "None": self.y2_color = [] for i, _d in enumerate(self.y2_data): self.y2_color.append(f"C{i}") if self.y2_style == "None": self.y2_style = [] for _i, _d in enumerate(self.y2_data): self.y2_style.append("solid") self.n = self.name self.led = [] # register with reservoir # self.associated_with.ldf.append(self) # register with model. needed for print_reservoirs self.register.ldf.append(self) if self.display_precision == 0: self.display_precision = self.mo.display_precision self.__register_with_parent__() def __write_data__( self, prefix: str, start: int, stop: int, stride: int, append: bool, directory: str, ) -> None: """Write_data and save_state. This method is called by the respective write_data and save_state functions. """ from pathlib import Path p = Path(directory) p.mkdir(parents=True, exist_ok=True) # some short hands mo = self.mo # model handle mtu = f"{mo.t_unit:~P}" rn = self.n # reservoir name mn = self.mo.n # model name if self.sp.mo.register == "None": fn = f"{directory}/{prefix}{mn}_{rn}.csv" # file name elif self.sp.mo.register == "local": fn = f"{directory}/{prefix}{rn}.csv" # file name else: raise DataFieldError( f"Model register keyword must be 'None'/'local' not { self.sp.mo.register }" ) # build the dataframe df: pd.dataframe = DataFrame() df[f"{self.n} Time [{mtu}]"] = self.mo.time[start:stop:stride] # time df[f"{self.n} {self.y1_legend}"] = self.y1_data[start:stop:stride] # y1 data if self.y2_data != "None": df[f"{self.n} {self.y2_legend}"] = self.y2_data[ start:stop:stride ] # y2_data file_path = Path(fn) if append and file_path.exists(): df.to_csv(file_path, header=False, mode="a", index=False) else: df.to_csv(file_path, header=True, mode="w", index=False) return df def __unify_data__(self, M: Model, x, y, label) -> tuple(list, list): """Format data so that it can be used by the __plot__ method. The input data for the DataField Class can be either missing, be a single array, or a list of arrays. The module analyzes the data and modifies is in such a way that it can be used by the __plot__ method without further adjustments. :param x: str, array, list, withe the x-data :param y: str, array, list, withe the y-data :return x,y: as list """ import numpy as np # print(f"ud0: type x = {type(x)}, len = {len(x)}") # print(f"ud1: type y = {type(x)}, len = {len(y)}") # consider the y-axis first if isinstance(y, str): ValueError("Provide at least one data-set") elif isinstance(y, np.ndarray): y = [y] y_l = 1 elif isinstance(y, list): y_l = len(y) else: raise DataFieldError("Y data needs to be array, numpy array or list") # consider the x-axis next if isinstance(x, str): # no x-data has been provided if y_l == 1: # single y data x = [M.time] # print(f" no x-data y_l = {y_l}") else: # mutiple y data # print(f" no x-data mutiple y-data y_l = {y_l}") x = [] for _e in range(y_l): x.append(M.time) elif isinstance(x, np.ndarray): # print(f"np_array, y_l = {y_l}") xx = [] if y_l == 1: # single y data xx.append(x) else: # mutiple y data # print(f" no x-data mutiple y-data y_l = {y_l}") for _e in range(y_l): xx.append(x) x = xx # print(f"after: {type(x)}") elif isinstance(x, list): # assume that lists match if len(x) != len(y): raise DataFieldError(f"Y data needs to match x data for {label}") else: raise DataFieldError( f"Y data needs to be array, numpy array or list for {label}" ) if y_l == 1: if not isinstance(label, list): label = [label] elif y_l > 1 and not isinstance(label, list): ll = [] # FIXME: Add test that this is list otherwise error for _e in y_l: ll.append(label) label = ll return x, y, label def __plot_data__(self, ax, x, y, t, label, i, color, style) -> None: """Plot data either as line or scatterplot. :param ax: axis handle :param x: x data :param y: y data :param t: plot type :param l: label str :param i: index """ if t == "plot": # ax.plot(x, y, color=f"C{i}", label=l) ax.plot(x, y, color=color[i], linestyle=style[i], label=label) else: ax.scatter(x, y, color=color[i], label=label) def __plot__(self, M: Model, ax) -> None: """Plot instructions. M: Model ax: matplotlib axes handle """ # plot external data first M.axd[ax] = "reversible" for i, d in enumerate(self.led): time = (d.x * M.t_unit).to(M.d_unit).magnitude # yd = (d.y * M.c_unit).to(self.plt_units).magnitude leg = f"{d.legend}" ax.scatter(time, d.y, color=f"C{i}", label=leg) self.x1_data, self.y1_data, self.y1_label = self.__unify_data__( M, self.x1_data, self.y1_data, self.y1_label, ) ymin = list() ymax = list() for i, _d in enumerate(self.y1_data): # loop over datafield list if self.x1_as_time: x1 = (self.x1_data[i] * M.t_unit).to(M.d_unit).magnitude else: x1 = self.x1_data[i] # y1 = (self.y * M.c_unit).to(self.plt_units).magnitude # 1 = self.y # y1_label = f"{self.legend_left} [{self.plt_units:~P}]" ptype = self.y1_type[i] if isinstance(self.y1_type, list) else self.y1_type self.__plot_data__( ax, x1, self.y1_data[i], ptype, self.y1_label[i], i, self.y1_color, self.y1_style, ) last_i = i u, v = ax.get_ylim() ymin.append(u) ymax.append(v) # set_y_limits(ax, self) # add any external data if present ymin = min(ymin) ymax = max(ymax) ax.set_ylim([ymin, ymax]) last_i = i ax.set_xlabel(f"{M.time_label} [{M.d_unit:~P}]") ax.set_ylabel(self.y1_legend) # remove unnecessary frame species ax.spines["top"].set_visible(False) handler1, label1 = ax.get_legend_handles_labels() # test if independeny y_data is present ymin = list() ymax = list() if not isinstance(self.y2_data, str): self.x2_data, self.y2_data, self.y2_label = self.__unify_data__( M, self.x2_data, self.y2_data, self.y2_label, ) axt = ax.twinx() for i, _d in enumerate(self.y2_data): # loop over datafield list if self.x2_as_time: x2 = (self.x1_data[i] * M.t_unit).to(M.d_unit).magnitude else: x2 = self.x1_data[i] self.__plot_data__( axt, x2, self.y2_data[i], self.y2_type, self.y2_label[i], i + last_i + 1, self.y2_color, self.y2_style, ) u, v = axt.get_ylim() ymin.append(u) ymax.append(v) ymin = min(ymin) ymax = max(ymax) axt.set_ylim([ymin, ymax]) axt.set_xlabel(f"{M.time_label} [{M.d_unit:~P}]") axt.set_ylabel(self.y2_legend) # remove unnecessary frame species axt.spines["top"].set_visible(False) handler2, label2 = axt.get_legend_handles_labels() _legend = axt.legend( handler1 + handler2, label1 + label2, loc=0 ).set_zorder(6) axt.legend() else: ax.legend(handler1, label1) ax.spines["right"].set_visible(False) ax.yaxis.set_ticks_position("left") ax.xaxis.set_ticks_position("bottom") if self.title != "None": ax.set_title(self.title) else: ax.set_title(self.register.full_name)
[docs] class SpeciesNoSet(SpeciesBase): """class that makes no assumptions about the type of data. I.e., all data will be left alone. The original class will calculate delta and concentration from mass an d and h and l. Since we want to use this class without a priory knowledge of how the reservoir arrays are being used we overwrite the data generated during initialization with the values provided in the keywords """ def __init__(self, **kwargs) -> None: """Initialize Class.""" from esbmtk import ( ConnectionProperties, Reservoir, SinkProperties, SourceProperties, SpeciesProperties, ) # provide a dict of all known keywords and their type self.defaults: dict[str, list[any, tuple]] = { "name": ["None", (str)], "species": ["None", (str, SpeciesProperties)], "plot_transform_c": ["None", (str, tp.Callable)], "legend_left": ["None", (str)], "plot": ["yes", (str)], "groupname": ["None", (str)], "function": ["None", (str, tp.Callable)], "display_precision": [0.01, (int, float)], "register": [ "None", ( SourceProperties, SinkProperties, Reservoir, ConnectionProperties, str, ), ], "full_name": ["None", (str)], "isotopes": [False, (bool)], "volume": ["None", (str, int, float)], "vr_datafields": [{}, (dict)], "function_input_data": (list, str), "function_params": [list(), (list, str)], "geometry": ["None", (list, str)], "alias_list": ["None", (list, str)], "ref_flux": ["None", (list, str)], } # provide a list of absolutely required keywords self.lrk: list = [ "name", "species", ] self.__validateandregister__(kwargs) self._initialize_legacy_attributes(kwargs) self.isotopes = False self.mu: str = self.sp.e.mass_unit # massunit xxxx self.plt_units = self.mo.c_unit # save the unit which was provided by the user for display purposes # ------------------------------------------ # left y-axis label self.lm: str = f"{self.species.n} [{self.mu}/l]" self.mo.lor.append(self) # add this reservoir to the model # self.mo.lor.remove(self) # but lets keep track of virtual reservoir in lvr. # this should be done in aux-inits of a derived class if necessary # self.mo.lvr.append(self) # print(f"added {self.name} to lvr 1") # register instance name in global name space self.__register_with_parent__() self.__aux_inits__() self.state = 0
[docs] class ExternalCode(SpeciesNoSet): """Implement user-provided functions. The data inside an ExternalCode instance will only change in response to a user-provided function but will otherwise remain unaffected. That is, it is up to the user-provided function to manage changes in response to external fluxes. An ExternalCode instance is declared in the following way:: ExternalCode( name="cs", # instance name species=M.CO2, # must be Speciesproperties instance vr_datafields={ # the vr_datafields contain any data that is referenced inside the "Hplus": self.swc.hplus, # function, rather than passed as an argument, and all data "Beta": 0.0 # that is explicitly referenced by the model }, function=calc_carbonates, # function reference, see below fname="function name as string", function_input_data="DIC TA", # Note that parameters must be individual float values function_params=(float), return_values={ # list of return values, these must be known species definitions "Hplus": rg.swc.hplus, "zsnow": float(abs(kwargs["zsnow"])), }, register=rh # reservoir_handle to register with ) The dictionary keys of `vr_datafields` will be used to create alias names which c an be used to access the respective variables. See the online documentation: https://esbmtk.readthedocs.io/ In the default configuration, ExternalCode instances are computed after all regular connections have been established. However, sometimes, a connection may depend on a computed value. In this case set the optional parameter ftype to "in_sequence" """ def __init__(self, **kwargs) -> None: """Initialize Class.""" from collections.abc import Callable from esbmtk import ( ConnectionProperties, Model, Reservoir, SinkProperties, SourceProperties, Species, Species2Species, SpeciesProperties, ) # provide a dict of all known keywords and their type self.defaults: dict[str, list[any, tuple]] = { "name": ["None", (str)], "species": ["None", (str, Species, SpeciesProperties)], "plot_transform_c": ["None", (str, Callable)], "legend_left": ["None", (str)], "plot": ["yes", (str)], "groupname": ["None", (str)], "function": ["None", (Callable, str)], "display_precision": [0.01, (int, float)], "register": [ "None", ( SourceProperties, SinkProperties, Reservoir, Species, Species2Species, ConnectionProperties, GasReservoir, Model, str, ), ], "full_name": ["None", (str)], "isotopes": [False, (bool)], "volume": ["None", (str, int, float)], "vr_datafields": ["None", (dict, str)], "function_input_data": ["None", (str, list)], "function_params": ["None", (tuple)], "fname": ["None", (str)], "geometry": ["None", (list, str)], "alias_list": ["None", (list, str)], "ftype": ["computed", (str)], "ref_flux": ["None", (list, str)], "return_values": ["None", (list)], "arguments": ["None", (str, list)], "r_s": ["None", (str, Reservoir)], "r_d": ["None", (str, Reservoir)], "r_n": ["None", (str, Reservoir)], } # provide a list of absolutely required keywords self.lrk: list = [ "name", "species", "register", ] self.__initialize_keyword_variables__(kwargs) self._initialize_legacy_attributes(kwargs) self.lro: list = [] # list of all return objects. self.mu: str = self.sp.e.mass_unit # massunit xxxx self.plt_units = self.mo.c_unit # left y-axis label self.lm: str = f"{self.species.n} [{self.mu}/l]" self.mo.lor.append(self) # add this reservoir to the model self.__register_with_parent__() self.state = 0 name = f"{self.full_name}_generic_function".replace(".", "_") logging.info(f"creating {name}") logging.debug(f"EC: {self.full_name}, isotopes = {self.isotopes}") if self.vr_datafields != "None": self.alias_list = list(self.vr_datafields.keys()) # initialize data fields self.vr_data = list() for e in self.vr_datafields.values(): if isinstance(e, float | int): self.vr_data.append(np.full(self.mo.steps, e, dtype=float)) else: self.vr_data.append(e) self.mo.lpc_r.append(self) # self.mo.lpc_r.append(self.gfh) self.mo.lor.remove(self) # but lets keep track of virtual reservoir in lvr. self.mo.lvr.append(self) if self.alias_list != "None": self.create_alialises() self.update_parameter_count()
[docs] def create_alialises(self) -> None: """Register alialises for each vr_datafield.""" for i, a in enumerate(self.alias_list): # print(f"{a} = {self.vr_data[i][0]}") setattr(self, a, self.vr_data[i])
[docs] def update_parameter_count(self): """Update Parameter Count.""" if len(self.function_params) > 0: self.param_start = self.model.vpc self.model.vpc = self.param_start + 1 self.has_p = True # upudate global parameter list self.model.gpt = (*self.model.gpt, self.function_params) else: self.has_p = False
[docs] def append(self, **kwargs) -> None: """Update GenericFunction parameters. After the VirtualSpecies has been initialized. This is most useful when parameters have to reference other virtual reservoirs which do not yet exist, e.g., when two virtual reservoirs have a circular reference. Example:: VR.update(a1=new_parameter, a2=new_parameter) """ allowed_keys: list = ["function_input_data, function_params"] # loop over provided kwargs for key, value in kwargs.items(): if key not in allowed_keys: raise ESBMTKFunctionError( "you can only change function_input_data, or function_params" ) else: getattr(self, key).append(value)
def __reset_state__(self): """Copy the last value to the first position so that we can restart the computation.""" for i, d in enumerate(self.vr_data): d[0] = d[-2] setattr( self, f"vrd_{i}", np.append(getattr(self, f"vrd_{i}"), d[0 : -2 : self.mo.reset_stride]), ) def __merge_temp_results__(self) -> None: """Replace the data fields with the saved values.""" # print(f"merging {self.full_name} with whith len of vrd= {len(self.vrd_0)}") for i, _d in enumerate(self.vr_data): self.vr_data[i] = getattr(self, f"vrd_{i}") # update aliases self.create_alialises() # print(f"new length = {len(self.vr_data[0])}") def __read_state__(self, directory: str) -> None: """Read virtual reservoir data from csv-file into a dataframe. The CSV file must have the following columns Model Time t X1 X2 """ from pathlib import Path if self.sp.mo.register == "None": fn = f"{directory}/state_{self.mo.n}_vr_{self.full_name}.csv" elif self.sp.mo.register == "local": fn = f"{directory}/state_{self.full_name}.csv" else: raise ESBMTKFunctionError( f"Model register keyword must be 'None'/'local' not { self.sp.mo.register }" ) file_path = Path(fn) if not file_path.exists(): raise ESBMTKFunctionError(f"File {fn} not found") logging.info(f"reading state for {self.full_name} from {fn}") # read csv file into dataframe self.df: pd.DataFrame = pd.read_csv(fn) self.headers: list = list(self.df.columns.values) df = self.df headers = self.headers # print(f"reading from {fn}") for i, _n in enumerate(headers): # first column is time if i > 0: # print(f"i = {i}, header = {n}, data = {df.iloc[-3:, i]}") self.vr_data[i - 1][:3] = df.iloc[-3:, i] def __sub_sample_data__(self, stride) -> None: """Subsample the results before saving, or processing them.""" # print(f"subsampling {self.fullname}") new: list = [] for d in self.vr_data: n = d[2:-2:stride] new.append(n) self.vr_data = new # update aliases self.create_alialises() def __write_data__( self, prefix: str, start: int, stop: int, stride: int, append: bool, directory: str, ) -> None: """To be called by write_data and save_state.""" from pathlib import Path p = Path(directory) p.mkdir(parents=True, exist_ok=True) mo = self.sp.mo # model handle rn = self.full_name # reservoir name mn = self.sp.mo.n # model name mtu = f"{mo.t_unit:~P}" if self.sp.mo.register == "None": fn = f"{directory}/{prefix}{mn}_vr_{rn}.csv" # file name elif self.sp.mo.register == "local": fn = f"{directory}/{prefix}{rn}.csv" # file name else: raise ESBMTKFunctionError( f"Model register keyword must be 'None'/'local' not { self.sp.mo.register }" ) df: pd.dataframe = DataFrame() df[f"{rn} Time [{mtu}]"] = self.mo.time[start:stop:stride] # time for i, d in enumerate(self.vr_data): h = self.alias_list[i] if self.alias_list != "None" else f"X{i}" df[h] = d[start:stop:stride] file_path = Path(fn) if append and file_path.exists(): df.to_csv(file_path, header=False, mode="a", index=False) else: df.to_csv(file_path, header=True, mode="w", index=False) return df
[docs] class VirtualSpeciesNoSet(ExternalCode): """Alias to ensure backwards compatibility."""
[docs] class VirtualSpecies(Species): """A virtual reservoir. Unlike regular reservoirs, the mass of a virtual reservoir depends entirely on the return value of a function. Example:: VirtualSpecies(name="foo", volume="10 liter", concentration="1 mmol", species= , function=bar, a1 to a3 = to 3optional function arguments, display_precision = number, optional, inherited from Model, ) The concentration argument will be used to initialize the reservoir and to determine the display units. The function definition follows the GenericFunction class. which takes a generic function and up to 6 optional function arguments, and will replace the mass value(s) of the given reservoirs with whatever the function calculates. This is particularly useful e.g., to calculate the pH of a given reservoir as function of e.g., Alkalinity and DIC. The function must return a list of numbers which correspond to the data which describe a reservoir i.e., mass, light isotope, heavy isotope, delta, and concentration In order to use this function we need first declare a function we plan to use with the generic function process. This function needs to follow this template:: def my_func(i, a1, a2, a3) -> tuple: # # i = index of the current max_timestep # a1 to a3 = optional function parameter. These must be present, # even if your function will not use it See above for details # calc some stuff and return it as # where m= mass, and l & h are the respective return [m, l, h, d, c] # isotopes. d denotes the delta value and # c the concentration # Use dummy value as necessary. This class provides an update method to resolve cases where e.g., two virtual reservoirs have a circular reference. See the documentation of update(). """ def __aux_inits__(self) -> None: """We us the regular init methods of the Species Class, and extend it in this method.""" from .processes import GenericFunction # if self.register != "None": # self.full_name = f"{self.full_name}.{self.name}" name = f"{self.full_name}_generic_function".replace(".", "_") logging.info(f"creating {name}") self.gfh = GenericFunction( name=name, function=self.function, a1=self.a1, a2=self.a2, a3=self.a3, a4=self.a4, act_on=self, ) # we only depend on the above function. so no need # to be in the reservoir list self.mo.lor.remove(self) # but lets keep track of virtual reservoir in lvr. self.mo.lvr.append(self) # print(f"added {self.name} to lvr 2")
[docs] def update(self, **kwargs) -> None: """Update GenericFunction parameters. After the VirtualSpecies has been initialized. This is most useful when parameters have to reference other virtual reservoirs which do not yet exist, e.g., when two virtual reservoirs have a circular reference. Example:: VR.update(a1=new_parameter, a2=new_parameter) """ allowed_keys: list = ["a1", "a2", "a3", "a4", "a5", "a6", "volume"] # loop over provided kwargs for key, value in kwargs.items(): if key not in allowed_keys: raise ValueError("you can only change a1 to a6") setattr(self, key, value) # update self setattr(self.gfh, key, value) # update function
[docs] class GasReservoir(SpeciesBase): """reservoir specific information similar to the Species class. Example:: Species(name = "foo", # Name of reservoir species = CO2, # SpeciesProperties handle # initial delta - optional (defaults to 0) delta = 20, reservoir_mass = quantity # total mass of all gases defaults to 1.78E20 mol species_ppm = number # concentration in ppm plot = "yes"/"no", defaults to yes plot_transform_c = a function reference, optional (see below) legend_left = str, optional, useful for plot transform display_precision = number, optional, inherited from Model register = optional, use to register with Reservoir Group isotopes = True/False otherwise use Model.m_type ) Accesing Species Data: ~~~~~~~~~~~~~~~~~~~~~~~~ You can access the reservoir data as: - Name.m # species mass - Name.l # mass of light isotope - Name.d # species delta (only avaible after M.get_delta_values() - Name.c # partial pressure - Name.v # total gas mass Useful methods include: - Name.write_data() # save data to file - Name.info() # info Species """ def __init__(self, **kwargs) -> None: """Initialize a reservoir.""" from collections.abc import Callable from esbmtk import Q_, Model, SpeciesProperties # provide a dict of all known keywords and their type self.defaults: dict[str, list[any, tuple]] = { "name": ["None", (str)], "species": ["None", (str, SpeciesProperties)], "delta": ["None", (int, float)], "reservoir_mass": ["1.7786E20 mol", (str, Q_)], "species_ppm": ["None", (str, Q_)], "plot_transform_c": ["None", (str, Callable)], "legend_left": ["None", (str)], "plot": ["yes", (str)], "groupname": ["None", (str)], "function": ["None", (str, Callable)], "display_precision": [0.01, (int, float)], "register": ["None", (str, Model)], "full_name": ["None", (str)], "isotopes": [False, (bool)], "geometry": ["None", (str, dict)], "rtype": ["regular", (str)], } # provide a list of absolutely required keywords self.lrk: list = [ "name", "species", "species_ppm", ] self.__initialize_keyword_variables__(kwargs) self._initialize_legacy_attributes(kwargs) # we re-use the volume instance variable but instead of a # volume, we use it store the mass. Use .m instead? self.v_unit = Q_("mole").units # setup base data if isinstance(self.reservoir_mass, str): self.reservoir_mass = Q_(self.reservoir_mass) if isinstance(self.species_ppm, str): self.species_ppm = Q_(self.species_ppm) # not sure this universally true but it works for carbon self.species_mass = (self.reservoir_mass * self.species_ppm).to("mol") self.display_as = "ppm" self.plt_units = "ppm" # we use the existing approach to calculate concentration # which will divide species_mass/volume. self.volume = self.reservoir_mass self.model.toc = (*self.model.toc, self.volume.magnitude) self.v_index = self.model.gcc self.model.gcc = self.model.gcc + 1 # Q_(self.species_mass).magnitude / self.species_ppm.to("dimensionless") # ).magnitude if self.v_unit != self.volume.units: raise GasReservoirError( f"\n\n{self.full_name} reservoir_mass units must be " f"in {self.v_unit} " f"not {self.volume.units}" ) # This should probably be species specific? self.mu: str = "ppm" # massunit xxxx # save the unit which was provided by the user for display purposes # left y-axis label self.lm: str = f"{self.species.n} [{self.mu}]" # initialize vectors self.m: NDArrayFloat = ( np.zeros(self.species.mo.steps) + self.species_mass.magnitude ) self.l: NDArrayFloat = np.zeros(self.mo.steps) # initialize concentration vector self.c: NDArrayFloat = self.m / self.volume.to(self.v_unit).magnitude if self.delta != "None": self.isotopes = True self.l = get_l_mass(self.c, self.delta, self.species.r) self.v: float = ( np.zeros(self.mo.steps) + self.volume.to(self.v_unit).magnitude ) # mass of atmosphere if self.mo.number_of_solving_iterations > 0: self.mc = np.empty(0) self.cc = np.empty(0) self.lc = np.empty(0) self.vc = np.empty(0) self.mo.lor.append(self) # add this reservoir to the model self.mo.lic.append(self) # reservoir type object list # register instance name in global name space # register to model unless a value is given if self.register == "None": self.register = self.mo self.parent = self.register self.__register_with_parent__() # decide which setitem functions to use if self.isotopes: self.__set_data__ = self.__set_with_isotopes__ else: self.__set_data__ = self.__set_without_isotopes__ # any auxilliary init - normally empty, but we use it here to extend the # reservoir class in virtual reservoirs self.__aux_inits__() self.state = 0 def __set_with_isotopes__(self, i: int, value: float) -> None: """Write data by index.""" self.m[i]: float = value[0] self.l[i]: float = value[1] def __set_without_isotopes__(self, i: int, value: float) -> None: """Write data by index.""" self.m[i]: float = value[0]
# self.c[i]: float = self.m[i] / self.v[i] # update concentration # self.v[i]: float = self.v[i - 1] + value[0]
[docs] class ExternalData(esbmtkBase): """Instances of this class hold external X/Y data. which can be associated with a reservoir. Example:: ExternalData(name = "Name" filename = "filename", legend = "label", legend_z = "label", offset = "0 yrs", reservoir = reservoir_handle, scale = scaling factor, optional display_precision = number, optional, inherited from Model convert_to = optional, see below y_as_z = False, plot_as_line = False, plot_args = {}, e.g., {"alpha": 0.5} ) The data must exist as CSV file, where the first column contains the X-values, and the second column contains the Y-values. The x-values must be time and specify the time units in the header between square brackets They will be mapped into the model time units. The y-values can be any data, but the user must take care that they match the model units defined in the model instance. So your data file mujst look like this Time [years], Data [units], Data [units] 1, 12, -12 2, 13, 12 By convention, the secon column should contain the same type of data as the reservoir (i.e., a concentration), whereas the third column contain isotope delta values. In cases were there is no concentration data, set y_as_z = True, and the second column will be printed on the right side of the plot. This will only work for reservoirs that have isotope data though! The column headers are only used for the time or concentration data conversion, and are ignored by the default plotting methods, but they are available as self.xh,yh The convert_to keyword can be used to force a specific conversion. The default is to convert into the model concentration units. Note that the instance name will be a combination of the name specified in the `name` field, and the name of the reservoir the data is associated with. The plot_args keyword allows to pass arbitrary plot arguments Methods ------- - name.plot() Data: - name.x - name.y - name.df = dataframe as read from csv file """ def __init__(self, **kwargs: dict[str, str]): """Initialize Class.""" from esbmtk import Q_, DataField, Model, Species from esbmtk.utility_functions import warn_if_non_numeric # dict of all known keywords and their type self.defaults: dict[str, list[any, tuple]] = { "name": ["None", (str)], "filename": ["None", (str)], "legend": ["None", (str)], "legend_z": ["None", (str)], "reservoir": ["None", (str, Species, DataField, GasReservoir, Signal)], "offset": ["0 yrs", (Q_, str)], "display_precision": [0.01, (int, float)], "scale": [1, (int, float)], "disp_units": [True, (bool)], "reverse_time": [False, (bool)], "y_as_z": [False, (bool)], "plot_as_line": [False, bool], "plot_args": [{}, dict], "register": [ "None", (str, Model, Species, DataField, GasReservoir, Signal), ], "plot_transform_c": ["None", (str, callable)], } # provide a list of absolutely required keywords self.lrk: list = [ "name", "filename", ["legend", "legend_z"], ["reservoir", "register"], ] self.__initialize_keyword_variables__(kwargs) # legacy names if self.register == "None": self.register = self.reservoir self.n: str = self.name # string = name of this instance self.fn: str = self.filename # string = filename of data if isinstance(self.reservoir, Species): self.mo: Model = self.reservoir.species.mo if isinstance(self.reservoir, Signal): self.mo: Model = self.register.species.mo if isinstance(self.register, Model): self.mo: Model = self.register else: self.mo = self.register.mo self.model = self.mo self.parent = self.register self.mo.led.append(self) # keep track of this instance if self.display_precision == 0: self.display_precision = self.mo.display_precision if not os.path.exists(self.fn): # check if the file is actually there raise ExternalDataError(f"Cannot find file {self.fn}") self.df: pd.DataFrame = pd.read_csv( self.fn, encoding="utf-8", engine="python" ) # read file self.df.iloc[:, 0] # check for misread data if warn_if_non_numeric(self.df): raise CSVReadError( f"\n\nNon Numeric data in {self.fn}\n" "Check for missing Exponents (i.e., 1E instead of 1E0), \n" "and/or encoding problems (files must be UTF8)\n" "Check for warnings above\n\n" ) # Extract headers ncols = len(self.df.columns) if ncols < 2: # test of we have at elast 2 columns raise ExternalDataError("CSV file must have at least 2 columns") elif ncols == 2: self.isotopes = False elif ncols == 3: self.isotopes = True elif ncols > 3: raise ExternalDataError("External data only supports up to 2 Y columns") else: raise ExternalDataError("ED: This should not happen") # test if we have invalid values for c in range(ncols): if self.df.iloc[:, c].isnull().values.any(): raise CSVDataError( f"Column {c} of {self.fn} contains invalid data\n" "check for typos or encoding errors" ) self.offset = self.ensure_q(self.offset) self.offset = self.offset.to(self.mo.t_unit).magnitude # get unit information from each header try: xh = self.df.columns[0].split("[")[1].split("]")[0] except Exception as e: raise CSVReadError( f"Unable to parse column headers in {self.fn}\n" "make sure your headers follow this template:\n" "Time [kyr], Rate [mol/yr], delta [dimensionless]\n" "If the headers are ok, there is likely an encoding error\n" "Try saving/converting your csv file to utf-8\n" ) from e yh = self.df.columns[1].split("[")[1].split("]")[0] self.zh = ( self.df.columns[2].split("[")[1].split("]")[0] if len(self.df.columns) > 2 else None ) # create the associated quantities. Note that zh is always dimensionless self.xq = Q_(xh) self.yq = Q_(yh) # Get data as numpy array and add units self.x: NDArrayFloat = self.df.iloc[:, 0].to_numpy() * self.xq self.y: NDArrayFloat = self.df.iloc[:, 1].to_numpy() * self.yq # Map time data into model units self.x = self.x.to(self.mo.t_unit) # Mapping to display units is now done by the plot routine! # if self.disp_units: # self.x = self.x.to(self.mo.d_unit) # else: # Map y data into model units if isinstance(self.yq, Q_): mol_liter = Q_("1 mol/liter").dimensionality mol_kg = Q_("1 mol/kg").dimensionality # test what type of Quantity we have if self.yq.dimensionless: # dimensionless pass elif self.yq.is_compatible_with("liter/yr"): # flux self.y = self.y.to(self.mo.r_unit) elif self.yq.is_compatible_with("mol/yr"): # flux self.y = self.y.to(self.mo.f_unit) elif ( self.yq.dimensionality == mol_liter or self.yq.dimensionality == mol_kg ): # concentration self.y = self.y.to(self.mo.c_unit) else: SignalError(f"No conversion to model units for {self.scale} specified") # Strip unit information self.x = self.x.magnitude self.y = self.y.magnitude # clip values outside of the model domain mask = (self.x >= self.model.start) & (self.x <= self.model.stop) self.x = self.x[mask] self.y = self.y[mask] if self.zh: # z data is assumed to be without units self.d: NDArrayFloat = self.df.iloc[:, 2].to_numpy() self.d = self.d[mask] self.z = self.d # map external time data from Ma to model space # x must be in model t units! See above if self.reverse_time: self.x = self.model.stop - self.x # register with reservoir self.__register__(self.register) self.__register_with_parent__() # if self.mo.register == "local" and self.register == "None": # self.register = self.mo def __register__(self, obj): """Register this dataset with a flux or reservoir. This will have the effect that the data will be printed together with the model results for this reservoir Example:: ExternalData.register(Species) """ self.obj = obj # reser handle we associate with obj.led.append(self) def __interpolate__(self) -> None: """Interpolate the input data with a resolution of dt across the model domain. The first and last data point must coincide with the model start and end time. In other words, this method will not patch data at the end points. This will replace the original values of name.x and name.y. However the original data remains accessible as name.df """ xi: NDArrayFloat = self.model.time if (self.x[0] > xi[0]) or (self.x[-1] < xi[-1]): message = ( f"\n Interpolation requires that the time domain" f"is equal or greater than the model domain" f"data t(0) = {self.x[0]}, tmax = {self.x[-1]}" f"model t(0) = {xi[0]}, tmax = {xi[-1]}" ) raise ExternalDataError(message) else: self.y: NDArrayFloat = np.interp(xi, self.x, self.y) self.x = xi
[docs] def plot(self) -> None: """Plot the data and save a pdf. Example:: ExternalData.plot() """ fig, ax = plt.subplots() # ax.scatter(self.x, self.y) ax.set_label(self.legend) ax.set_xlabel(self.xh) ax.set_ylabel(self.yh) plt.show() plt.savefig(self.n + ".pdf")
def __plot__(edo, i, ax, axt="None") -> None: """Plot data in existing figure. Parameters ---------- edo : ExternalData external data object i : int counter ax : plt.axes axes object axt : plt.axes Optional twinx axes object Note that the x-array data has been mapped into model time units upon import. So we need to remap into display time units first. """ # Convert time and data to display units x = (edo.x * edo.model.t_unit).to(edo.model.d_unit).magnitude if edo.y_as_z: # ignore y data if edo.plot_as_line: if axt == "None": ax.plot( x, edo.y, color=f"C{i + 1}", label=edo.legend_z, **edo.plot_args ) else: axt.plot( x, edo.y, color=f"C{i + 1}", label=edo.legend_z, **edo.plot_args ) i = i + 2 else: if axt == "None": ax.scatter( x, edo.y, color=f"C{i}", label=edo.legend_z, **edo.plot_args ) else: axt.scatter( x, edo.y, color=f"C{i}", label=edo.legend_z, **edo.plot_args ) i = i + 1 elif edo.zh: # plot y and z data if edo.plot_as_line: ax.plot( x, edo.y * edo.d_scale, color=f"C{i + 1}", label=edo.legend, **edo.plot_args, ) if edo.isotopes: axt.plot( x, edo.z, color=f"C{i + 2}", label=edo.legend_z, **edo.plot_args ) i = i + 3 else: ax.scatter( x, edo.y * edo.d_scale, color=f"C{i}", label=edo.legend, **edo.plot_args, ) axt.scatter( x, edo.z, color=f"C{i + 1}", label=edo.legend_z, **edo.plot_args ) i = i + 2 else: # plot only y data ax.scatter( x, edo.y * edo.d_scale, color=f"C{i}", label=edo.legend, **edo.plot_args ) i = i + 1 return i