Source code for esbmtk.post_processing

"""esbmtk: A general purpose Earth Science box model toolkit.

Copyright (C), 2020 Ulrich G.  Wortmann

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or (at
your option) any later version.

This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""

from __future__ import annotations

import typing as tp

import numpy as np
import numpy.typing as npt

if tp.TYPE_CHECKING:
    from .esbmtk import GasReservoir, Reservoir, Species, SpeciesGroup

# declare numpy types
NDArrayFloat = npt.NDArray[np.float64]
NDArrayInt = npt.NDArray[np.int64]


[docs] def carbonate_system_1_pp(box_names: SpeciesGroup) -> None: """Calculate carbonate species. Based on previously calculated Hplus, TA, and DIC concentrations. LIMITATIONS: - Assumes all concentrations are in mol/kg - Assumes your Model is in mol/kg ! Otherwise, DIC and TA updating will not be correct. Calculations are based off equations from: Boudreau et al., 2010, https://doi.org/10.1029/2009GB003654 Follows, 2006, doi:10.1016/j.ocemod.2005.05.004 :param rg: A reservoirgroup object with initialized carbonate system """ from esbmtk import VectorData if not isinstance(box_names, list): box_names = [box_names] for rg in box_names: k1 = rg.swc.K1 # K1 k2 = rg.swc.K2 # K2 k1k2 = rg.swc.K1K2 hplus = rg.Hplus.c dic = rg.DIC.c VectorData( name="HCO3", register=rg, species=rg.mo.HCO3, data=dic / (1 + hplus / k1 + k2 / hplus), label="HCO3-", plt_units=rg.mo.c_unit, ) VectorData( name="CO3", register=rg, species=rg.mo.CO3, data=dic / (1 + hplus / k2 + hplus**2 / k1k2), label="CO32-", plt_units=rg.mo.c_unit, ) VectorData( name="pH", register=rg, species=rg.mo.pH, data=-np.log10(hplus), label="pH", plt_units="total scale", ) VectorData( name="Omega", register=rg, species=rg.mo.pH, data=rg.swc.ca2 * rg.CO3.c / rg.swc.Ksp_ca, label=r"$\Omega$-Calcite", plt_units="", )
[docs] def carbonate_system_2_pp( bn: Reservoir | list, # 2 Reservoir handle export_fluxes: float | list, # 3 CaCO3 export flux as DIC zsat_min: float = 200, zmax: float = 10000, ) -> None: """Calculate the fraction of carbonate that is dissolved. :param rg: Reservoir, e.g., M.D_b :param export: export flux in mol/year :param zsat_min: depth of mixed layer :param zmax: depth of lookup table This function then saves the data as M.box_name.Hplus M.box_name.CO3 M.box_name.CO2aq M.box_name.pH M.box_name.zsat # top of lysocline M.box_name.zcc # bottom of lysocline M.box_name.zsnow # snow line M.box_name.Fburial # The CaCO3 burial flux (Export - dissolution) M.box_name.Fdiss # The CaCO3 dissolution flux LIMITATIONS: - Assumes all concentrations are in mol/kg - Assumes your Model is in mol/kg ! Otherwise, DIC and TA updating will not be correct. Calculations are based off equations from: Boudreau et al., 2010, https://doi.org/10.1029/2009GB003654 Follows, 2006, doi:10.1016/j.ocemod.2005.05.004 """ from math import log from esbmtk import VectorData # ensure that all objects are lists if not isinstance(bn, list): bn = [bn] if not isinstance(export_fluxes, list): export_fluxes = [export_fluxes] # loop over boxes for i, rg in enumerate(bn): p = rg.cs2.function_params sp, cp, area_table, area_dz_table, Csat_table = p ksp0, kc, AD, zsat0, I_caco3, alpha, zsat_min, zmax, z0 = cp k1, k2, k1k2, KW, KB, ca2, boron, isotopes = sp hplus: NDArrayFloat = rg.Hplus.c dic: NDArrayFloat = rg.DIC.c zsnow: NDArrayInt = rg.zsnow.c.astype(int) export_data = export_fluxes[i] # make sure we have a vector if isinstance(export_data, float | int): export: NDArrayFloat = dic * 0 + export_data else: export = export_data # hco3 = dic / (1 + hplus / k1 + k2 / hplus) co3: NDArrayFloat = dic / (1 + hplus / k2 + hplus**2 / k1k2) co2aq: NDArrayFloat = dic / (1 + k1 / hplus + k1k2 / hplus**2) zsat: NDArrayInt = np.clip( zsat0 * np.log(ca2 * co3 / ksp0), zsat_min, zmax, ).astype(int) B_AD: NDArrayFloat = export / AD Fdiss: NDArrayFloat = co3 * 0 Fburial: NDArrayFloat = co3 * 0 zcc: NDArrayInt = co3.astype(int) * 0 for i, z in enumerate(zsat): zcc[i] = int( zsat0 * log(export[i] * ca2 / (ksp0 * AD * kc) + ca2 * co3[i] / ksp0) ) # eq3 if zcc[i] > zmax: zcc[i] = zmax print( f"Warning zcc > zmax, i = {i}, co3 = {co3[i] * 1e6} umol/kg, export = {export[i] / 1e12:.2f} Tmol/y" ) elif zcc[i] < z0: zcc[i] = z0 A_z0_zsat: float = area_table[z0] - area_table[z] A_zsat_zcc: float = area_table[z] - area_table[zcc[i]] A_zcc_zmax: float = area_table[zcc[i]] - area_table[zmax] BCC: float = A_zcc_zmax * B_AD[i] BNS: float = alpha * A_z0_zsat * B_AD[i] diff_co3: float = Csat_table[z : zcc[i]] - co3[i] area_p: NDArrayFloat = area_dz_table[z : zcc[i]] try: BDS_under: float = kc * area_p.dot(diff_co3) except: breakpoint() BDS_resp: float = alpha * (A_zsat_zcc * B_AD[i] - BDS_under) BDS: float = BDS_under + BDS_resp """ Note that we do not recalculate zsnow in post processing since it is already known from the original run """ if zsnow[i] <= zcc[i]: # reset zsnow # dzdt_zsnow: int = abs(zsnow - zcc) # zsnow[i] = zcc[i] BPDC: float = 0.0 else: # integrate saturation difference over area # if zsnow[i] > zmax: # zsnow[i] = zmax # integrate saturation difference over area diff: float = Csat_table[zcc[i] : zsnow[i]] - co3[i] area_p_snow: NDArrayFloat = area_dz_table[zcc[i] : zsnow[i]] BPDC = max(0, kc * area_p_snow.dot(diff)) # dzdt_zsnow = -BPDC / (area_dz_table[int(zsnow)] * I_caco3) Fdiss[i] = BDS + BCC + BNS + BPDC Fburial[i] = export[i] - Fdiss[i] VectorData( name="Fburial", register=rg, species=rg.mo.Fburial, data=Fburial, label="Fburial", plt_units=rg.mo.f_unit, ) VectorData( name="Fdiss", register=rg, species=rg.mo.Fdiss, data=Fdiss, label="Fdiss", plt_units=rg.mo.f_unit, ) VectorData( name="CO3", register=rg, species=rg.mo.CO3, data=co3, label="CO32-", plt_units=rg.mo.c_unit, ) VectorData( name="CO2aq", register=rg, species=rg.mo.CO2aq, data=co2aq, label="CO2aq", plt_units=rg.mo.c_unit, ) VectorData( name="pH", register=rg, species=rg.mo.pH, data=-np.log10(hplus), label="pH", plt_units="total scale", ) VectorData( name="zsat", register=rg, species=rg.mo.zsat, data=zsat, label="zsat", plt_units="m", ) VectorData( name="zcc", register=rg, species=rg.mo.zcc, data=zcc, label="zcc", plt_units="m", ) VectorData( name="CaCO3_export", register=rg, species=rg.mo.DIC, data=export, label="CaCO3_export", plt_units="mol/year", )
""" Carbonate system 3 post processing: Currently works in the same manner as carbonate_system_2_post_processng """
[docs] def carbonate_system_3_pp( bn: Reservoir | list, # 2 Reservoir handle export_fluxes: float | list, # 3 CaCO3 export flux as DIC zsat_min: float = 200, zmax: float = 10000, ) -> None: """Calculate the fraction of carbonate that is dissolved. :param rg: Reservoir, e.g., M.D_b :param export: export flux in mol/year :param zsat_min: depth of mixed layer :param zmax: depth of lookup table This function then saves the data as M.box_name.Hplus M.box_name.CO3 M.box_name.CO2aq M.box_name.pH M.box_name.zsat # top of lysocline M.box_name.zcc # bottom of lysocline M.box_name.zsnow # snow line M.box_name.Fburial # The CaCO3 burial flux (Export - dissolution) M.box_name.Fdiss # The CaCO3 dissolution flux LIMITATIONS: - Assumes all concentrations are in mol/kg - Assumes your Model is in mol/kg ! Otherwise, DIC and TA updating will not be correct. Calculations are based off equations from: Boudreau et al., 2010, https://doi.org/10.1029/2009GB003654 Follows, 2006, doi:10.1016/j.ocemod.2005.05.004 """ from math import log from esbmtk import VectorData # ensure that all objects are lists if not isinstance(bn, list): bn = [bn] if not isinstance(export_fluxes, list): export_fluxes = [export_fluxes] # loop over boxes for i, rg in enumerate(bn): p = rg.cs3.function_params sp, cp, area_table, area_dz_table, Csat_table = p ksp0, kc, AD, zsat0, I_caco3, alpha, zsat_min, zmax, z0 = cp k1, k2, k1k2, KW, KB, ca2, boron, isotopes = sp hplus: NDArrayFloat = rg.Hplus.c dic: NDArrayFloat = rg.DIC.c zsnow: NDArrayInt = rg.zsnow.c.astype(int) export_data = export_fluxes[i] # make sure we have a vector if isinstance(export_data, float | int): export: NDArrayFloat = dic * 0 + export_data else: export = export_data # hco3 = dic / (1 + hplus / k1 + k2 / hplus) co3: NDArrayFloat = dic / (1 + hplus / k2 + hplus**2 / k1k2) co2aq: NDArrayFloat = dic / (1 + k1 / hplus + k1k2 / hplus**2) zsat: NDArrayInt = np.clip( zsat0 * np.log(ca2 * co3 / ksp0), zsat_min, zmax, ).astype(int) B_AD: NDArrayFloat = export / AD Fdiss: NDArrayFloat = co3 * 0 Fburial: NDArrayFloat = co3 * 0 zcc: NDArrayInt = co3.astype(int) * 0 for i, z in enumerate(zsat): zcc[i] = int( zsat0 * log(export[i] * ca2 / (ksp0 * AD * kc) + ca2 * co3[i] / ksp0) ) # eq3 if zcc[i] > zmax: zcc[i] = zmax print( f"Warning zcc > zmax, i = {i}, co3 = {co3[i] * 1e6} umol/kg, export = {export[i] / 1e12:.2f} Tmol/y" ) elif zcc[i] < z0: zcc[i] = z0 A_z0_zsat: float = area_table[z0] - area_table[z] A_zsat_zcc: float = area_table[z] - area_table[zcc[i]] A_zcc_zmax: float = area_table[zcc[i]] - area_table[zmax] BCC: float = A_zcc_zmax * B_AD[i] BNS: float = alpha * A_z0_zsat * B_AD[i] diff_co3: float = Csat_table[z : zcc[i]] - co3[i] area_p: NDArrayFloat = area_dz_table[z : zcc[i]] try: BDS_under: float = kc * area_p.dot(diff_co3) except: breakpoint() BDS_resp: float = alpha * (A_zsat_zcc * B_AD[i] - BDS_under) BDS: float = BDS_under + BDS_resp """ Note that we do not recalculate zsnow in post processing since it is already known from the original run """ if zsnow[i] <= zcc[i]: # reset zsnow # dzdt_zsnow: int = abs(zsnow - zcc) # zsnow[i] = zcc[i] BPDC: float = 0.0 else: # integrate saturation difference over area # if zsnow[i] > zmax: # zsnow[i] = zmax # integrate saturation difference over area diff: float = Csat_table[zcc[i] : zsnow[i]] - co3[i] area_p_snow: NDArrayFloat = area_dz_table[zcc[i] : zsnow[i]] BPDC = max(0, kc * area_p_snow.dot(diff)) # dzdt_zsnow = -BPDC / (area_dz_table[int(zsnow)] * I_caco3) Fdiss[i] = BDS + BCC + BNS + BPDC Fburial[i] = export[i] - Fdiss[i] VectorData( name="Fburial", register=rg, species=rg.mo.Fburial, data=Fburial, label="Fburial", plt_units=rg.mo.f_unit, ) VectorData( name="Fdiss", register=rg, species=rg.mo.Fdiss, data=Fdiss, label="Fdiss", plt_units=rg.mo.f_unit, ) VectorData( name="CO3", register=rg, species=rg.mo.CO3, data=co3, label="CO32-", plt_units=rg.mo.c_unit, ) VectorData( name="CO2aq", register=rg, species=rg.mo.CO2aq, data=co2aq, label="CO2aq", plt_units=rg.mo.c_unit, ) VectorData( name="pH", register=rg, species=rg.mo.pH, data=-np.log10(hplus), label="pH", plt_units="total scale", ) VectorData( name="zsat", register=rg, species=rg.mo.zsat, data=zsat, label="zsat", plt_units="m", ) VectorData( name="zcc", register=rg, species=rg.mo.zcc, data=zcc, label="zcc", plt_units="m", ) VectorData( name="CaCO3_export", register=rg, species=rg.mo.DIC, data=export, label="CaCO3_export", plt_units="mol/year", )
[docs] def gas_exchange_fluxes( liquid_reservoir: Species, gas_reservoir: GasReservoir, pv: str, ): """Calculate gas exchange fluxes for a given reservoir. :param liquid_reservoir: Species handle :param gas_reservoir: Species handle :param pv: piston velocity as string e.g., "4.8 m/d" :returns: """ from esbmtk import Q_, gas_exchange if isinstance(pv, str): pv = Q_(pv).to("meter/yr").magnitude elif isinstance(pv, Q_): pv = pv.to("meter/yr").magnitude else: raise ValueError("pv must be quantity or string") scale = liquid_reservoir.register.area * pv gas_c = gas_reservoir if liquid_reservoir.species.name == "DIC": solubility = liquid_reservoir.register.swc.SA_co2 g_c_aq = liquid_reservoir.register.CO2aq a_db = liquid_reservoir.register.swc.co2_a_db a_dg = liquid_reservoir.register.swc.co2_a_dg a_u = liquid_reservoir.register.swc.co2_a_u elif liquid_reservoir.species.name == "O2": solubility = liquid_reservoir.register.swc.SA_o2 g_c_aq = liquid_reservoir.register.O2 a_db = liquid_reservoir.register.swc.o2_a_db a_dg = liquid_reservoir.register.swc.o2_a_dg a_u = liquid_reservoir.register.swc.o2_a_u else: raise ValueError("flux calculation is only supported for DIC and O2") p = ( scale, solubility, a_db, a_dg, a_u, gas_reservoir.isotopes, liquid_reservoir.isotopes, ) return gas_exchange(gas_c.c, liquid_reservoir.c, g_c_aq.c, p)