Source code for esbmtk.processes

"""esbmtk: A general purpose Earth Science box model toolkit.

Copyright(C), 2020-2021 Ulrich G. Wortmann

This program is free software: you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of
the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see
<https://www.gnu.org/licenses/>.
"""

from __future__ import annotations

import typing as tp

import numpy as np
import numpy.typing as npt

if tp.TYPE_CHECKING:
    from esbmtk import Q_, Species2Species

# declare numpy types
NDArrayFloat = npt.NDArray[np.float64]


[docs] def weathering_no_isotopes( c_pco2: float | list[float], source_data: float | list[float], p: tuple, ) -> float | tuple: """Calculate weathering as a function of pCO2. Parameters ---------- c_pco2 : float | list[float] current pCO2 concentration as absolute concentration, e.g. 280 ppm = 0.00028 p : tuple a tuple with the following entries: pco2_0 = reference pCO2 area_fraction = fraction of total surface area ex = exponent used in the equation f0 = flux at the reference value Returns ------- float a float value for the weathering flux Explanation ----------- If the model uses isotopes, the function expects the concentration values for the total mass and the light isotope as a list, and will simiraly return the flux as a list of total flux and flux of the light isotope. The flux itself is calculated as F_w = area_fraction * f0 * (pco2/pco2_0)**ex """ pco2_0, area_fraction, ex, f0 = p f = area_fraction * f0 * (c_pco2 / pco2_0) ** ex return f
# Note this only works for for the case where the atmosphere # has isotopes, and the sink has none, But fails if both # have isotopes. # @njit(fastmath=True)
[docs] def weathering_ref_isotopes( c_pco2: float | list[float], source_data: float | list[float], p: tuple, ) -> float | tuple: """Calculate weathering as a function of pCO2. This is the same function as weathering_no_isotopes, but assumes that the data is a tuple. """ pco2_0, area_fraction, ex, f0 = p pco2, pco2i = c_pco2 return f0 * area_fraction * (pco2 / pco2_0) ** ex
# @njit(fastmath=True)
[docs] def weathering_isotopes( c_pco2: float | list[float], source_data: float | list[float], p: tuple, ) -> float | tuple: """Calculate weathering as a function of pCO2. This is the same function as weathering_no_isotopes, but assumes that we weather a species (e.g., carbonate) that requires fluxes for both isotopes. data is a tuple. """ pco2, pco2_l = c_pco2 # pco2 data s_c, s_l = source_data pco2_0, area_fraction, ex, f0 = p # constants w_scale = area_fraction * (pco2 / pco2_0) ** ex F_w = f0 * w_scale F_w_i = f0 * w_scale * s_l / s_c return (F_w, F_w_i)
# @njit(fastmath=True)
[docs] def weathering_isotopes_delta( c_pco2: float | list[float], source_data: float | list[float], p: tuple, ) -> float | tuple: """Calculate weathering as a function of pCO2. This is the same function as weathering_no_isotopes, but assumes that we weather a species (e.g., carbonate) that requires fluxes for both isotopes. data is a tuple. """ pco2_0, area_fraction, ex, f0, delta, r = p pco2, pco2i = c_pco2 # pco2 data w_scale = area_fraction * (pco2 / pco2_0) ** ex F_w = f0 * w_scale F_w_i = f0 * w_scale * 1000 / (r * (delta + 1000) + 1000) return (F_w, F_w_i)
# @njit(fastmath=True)
[docs] def weathering_isotopes_alpha( c_pco2: float | list[float], source_data: float | list[float], p: tuple, ) -> float | tuple: """Calculate weathering as a function of pCO2. This is the same function as weathering_no_isotopes, but assumes that we weather a species (e.g., carbonate) that requires fluxes for both isotopes. data is a tuple. s_c = source mass s_l = source mass of light isotope """ pco2_0, area_fraction, ex, f0, alpha, r = p pco2, pco2i = c_pco2 # atmosphere mass and light isotope mass s_c, s_l = source_data w_scale = area_fraction * (pco2 / pco2_0) ** ex F_w = f0 * w_scale # flux at a given pco2 value F_w_i = f0 * w_scale * s_l / (alpha * s_c + s_l - alpha * s_l) return (F_w, F_w_i)
[docs] def init_weathering( c: Species2Species, pco2: float, pco2_0: float | str | Q_, area_fraction: float, ex: float, f0: float | str | Q_, ): """Create a new external code instance. :param c: Species2Species :param pco2: float current pco2 :param pco2_0: float reference pco2 :area_fraction: float area/total area :param ex: exponent :f0: flux at pco2_0 """ from esbmtk import ExternalCode, Sink, Source, check_for_quantity f0 = check_for_quantity(f0, "mol/year").magnitude pco2_0 = check_for_quantity(pco2_0, "ppm").magnitude # p = (pco2_0, area_fraction, ex, f0) c.fh.ftype = "computed" c.isotopes = c.source.isotopes # the co may have missed this if c.delta != "None": weathering_function = "weathering_isotopes_delta" p = (pco2_0, area_fraction, ex, f0, c.delta, c.source.species.r) elif c.epsilon != "None": weathering_function = "weathering_isotopes_alpha" alpha = c.epsilon / 1000 + 1 # convert to alpha notation p = ( pco2_0, # reference pCO2 area_fraction, # area relative to total ocean area ex, # exponent f0, # flux at pCO2_0 alpha, # fractionation factor c.source.species.r, # isotope reference species ) elif isinstance(c.source, Source) and isinstance(c.sink, Sink): if c.reservoir_ref.isotopes: weathering_function = "weathering_ref_isotopes" p = (pco2_0, area_fraction, ex, f0) else: weathering_function = "weathering_no_isotopes" p = (pco2_0, area_fraction, ex, f0) elif ( (c.isotopes and c.sink.isotopes) or (c.reservoir_ref.isotopes and c.sink.isotopes) or (c.isotopes and isinstance(c.sink, Sink)) ): weathering_function = "weathering_isotopes" p = (pco2_0, area_fraction, ex, f0) elif c.reservoir_ref.isotopes and not c.sink.isotopes: weathering_function = "weathering_ref_isotopes" p = (pco2_0, area_fraction, ex, f0) else: weathering_function = "weathering_no_isotopes" p = (pco2_0, area_fraction, ex, f0) ec = ExternalCode( name=f"ec_weathering_{c.id}", fname=weathering_function, isotopes=c.reservoir_ref.isotopes, ftype="std", species=c.sink.species, function_input_data=[pco2, c.source], function_params=p, register=c.model, return_values=[ {f"F_{c.fh.full_name}": "fww"}, ], ) c.mo.lpc_f.append(ec.fname) return ec
[docs] def init_gas_exchange(c: Species2Species): """Create ExternalCode instance for gas exchange reactions. Parameters ---------- c : Species2Species connection instance """ import warnings from esbmtk import ExternalCode, check_for_quantity c.fh.ftype = "computed" sink_reservoir = c.sink.register swc = sink_reservoir.swc # sink - liquid if sink_reservoir.set_area_warning: warnings.warn( f"\nGEX for {sink_reservoir.full_name} is using the entire ocean area.\n" "Consider adjusting the area fraction parameter of the box geometry:\n" "g = [upper depth, lower depth, area fraction] \n" ) c.model.now = c.model.now + 1 if c.solubility == "None": # use predefined values if c.species.name == "CO2": ref_species = sink_reservoir.CO2aq solubility = swc.SA_co2 a_db = swc.co2_a_db a_dg = swc.co2_a_dg a_u = swc.co2_a_u elif c.species.name == "O2": ref_species = sink_reservoir.O2 solubility = swc.SA_O2 a_db = swc.o2_a_db a_dg = swc.o2_a_dg a_u = swc.o2_a_u else: raise ValueError( f"Gas exchange is undefined for {c.species.name}\n" f"consider manual setup?\n" ) else: # use user supplied values ref_species = sink_reservoir.O2 if c.ref_species == "None" else c.ref_species solubility = ( check_for_quantity(c.solubility, "mol/(m^3 * atm)") .to("mol/(m^3 * atm)") .magnitude ) a_db = c.a_db a_dg = c.a_dg a_u = c.a_u piston_velocity = ( check_for_quantity(c.piston_velocity, "m/yr").to("meter/year").magnitude ) area = check_for_quantity(sink_reservoir.area, "m**2").to("meter**2").magnitude scale = area * piston_velocity * c.scale p = ( scale, solubility, a_db, a_dg, a_u, c.isotopes, # c.source.isotopes, ref_species.isotopes, ) ec = ExternalCode( name=f"gas_exchange{c.id}", fname="gas_exchange", ftype="std", species=c.source.species, function_input_data=[c.source, c.sink, ref_species], function_params=p, isotopes=c.isotopes, register=c.sink, return_values=[ {f"F_{c.fh.full_name}": "gex"}, ], ) c.mo.lpc_f.append(ec.fname) # this is now initialized through a Species2Species instance. This instance # will create the necessary Flux instances. Since this will compute the # fluxes. We need to set the compute_by flag for f in c.lof: f.ftype = "std" return ec
[docs] def gas_exchange( gas_c: float | tuple, liquid_c: float | tuple, gas_aq_c: float, p: tuple, ) -> float | tuple: """Calculate the gas exchange flux across the air sea interface. including isotope effects. Parameters ---------- gas_c : float | tuple gas concentration in atmosphere liquid_c : float | tuple reference species in liquid phase, e.g., DIC gas_aq: float dissolved gas concentration, e.g., CO2aq p : tuple parameters, see init_gas_exchange Returns ------- float | tuple gas flux across the air/sea interface Note that the sink delta is co2aq as returned by the carbonate VR this equation is for mmol but esbmtk uses mol, so we need to multiply by 1E3 The Total flux across interface dpends on the difference in either concentration or pressure the atmospheric pressure is known, as gas_c, and we can calculate the equilibrium pressure that corresponds to the dissolved gas in the water as [CO2]aq/beta. Conversely, we can convert the the pCO2 into the amount of dissolved CO2 = pCO2 * beta The h/c ratio in HCO3 estimated via h/c in DIC. Zeebe writes C12/C13 ratio but that does not work. the C13/C ratio results however in -8 permil offset, which is closer to observations """ scale, solubility, a_db, a_dg, a_u, species_isotopes, ref_species_isotopes = p if species_isotopes: gas_c, gas_c_l = gas_c liquid_c, liquid_c_l = liquid_c if ref_species_isotopes: gas_aq_c, _l = gas_aq_c else: gas_aq_c = gas_aq_c # Solubility with correction for pH2O # beta = solubility * (1 - p_H2O) beta = solubility # solubility is already corrected for p_H20 # f as afunction of solubility difference f = scale * (beta * gas_c - gas_aq_c * 1e3) rv = f if species_isotopes: # isotope ratio of DIC Rt = (liquid_c - liquid_c_l) / liquid_c # get heavy isotope concentrations in atmosphere gas_c_h = gas_c - gas_c_l # gas heavy isotope concentration # get exchange of the heavy isotope f_h = scale * a_u * (a_dg * gas_c_h * beta - Rt * a_db * gas_aq_c * 1e3) rv = f, f - f_h # the corresponding flux of the light isotope return rv