Source code for soil_heat.wang_and_yang

"""
Yang & Wang (2008) – Equation Set Implementation
=================================================
Python translation of every numbered equation in:

> Yang, K., & Wang, J. (2008). *A temperature prediction‑correction method for
> estimating surface soil heat flux from soil temperature and moisture data.*
> *Science in China Series D: Earth Sciences, 51*(5), 721‑729.
> https://doi.org/10.1007/s11430‑008‑0036‑1

The paper introduces a *Temperature‑Diffusion plus Error‑Correction* (TDEC)
approach for estimating surface soil heat flux.  This module provides a direct
one‑to‑one mapping from each equation in the paper (Eqs. 1–12) to a Python
function.  Helper utilities required by those equations—matrix creators, grid
stretching, tridiagonal solvers, etc.—are also included.

All functions accept **NumPy arrays** or scalars where applicable and are fully
type‑annotated.  Docstrings use the **NumPy docstring standard** so they can be
rendered by *Sphinx‑napoleon*.
"""

from __future__ import annotations

import math
from typing import Sequence, Tuple

import numpy as np

# -----------------------------------------------------------------------------
# Constants & type aliases
# -----------------------------------------------------------------------------
SIGMA_SB: float = 5.670e-8  # Stefan‑Boltzmann constant (W m‑2 K‑4)
RHO_WATER: float = 1_000.0  # Density of liquid water (kg m‑3)
CP_WATER: float = 4_200_000.0  # Volumetric heat capacity of water (J m‑3 K‑1)

ArrayLike = np.ndarray | Sequence[float]

# -----------------------------------------------------------------------------
# Equation (1) & (2) –  the 1‑D thermal diffusion equation & Fourier’s law
# -----------------------------------------------------------------------------


[docs] def soil_heat_flux( Tz: ArrayLike, dz: ArrayLike, lambda_s: ArrayLike | float ) -> np.ndarray: # Eq. (2) """ Compute heat flux *G* at cell interfaces using Fourier’s law. This function calculates the conductive heat flux between soil layers based on the temperature gradient and thermal conductivity. .. math:: G = -\\lambda_s \\frac{\\partial T}{\\partial z} Parameters ---------- Tz : ArrayLike A 1D array of temperatures at the **centre** of each soil layer (K). dz : ArrayLike A 1D array of the thickness of each soil layer (m). lambda_s : ArrayLike or float Thermal conductivity for each layer (W m⁻¹ K⁻¹). Can be a single value (for homogeneous soil) or an array matching `Tz`. Returns ------- numpy.ndarray An array of heat fluxes (W m⁻²) at the *interfaces* between layers, including the surface and bottom boundaries. The length of the output is `len(Tz) + 1`. Positive values indicate downward flux. """ Tz = np.asarray(Tz, dtype=float) dz = np.asarray(dz, dtype=float) lam = np.asarray(lambda_s, dtype=float) if np.ndim(lambda_s) else float(lambda_s) # Effective conductivity at interfaces is the average of adjacent layers if np.ndim(lam): lam_int = 0.5 * (lam[:-1] + lam[1:]) else: lam_int = lam # Gradient between layer centers dT = np.diff(Tz) # Distance between layer centers dz_int = 0.5 * (dz[:-1] + dz[1:]) G_int = -lam_int * dT / dz_int # Extrapolate surface & bottom fluxes assuming the same gradient as the # nearest two layers (Neumann boundary condition approximation). if np.ndim(lam): G_surface = -lam[0] * (Tz[0] - Tz[1]) / (dz[0]/2 + dz[1]/2) G_bottom = -lam[-1] * (Tz[-2] - Tz[-1]) / (dz[-2]/2 + dz[-1]/2) else: G_surface = -lam * (Tz[0] - Tz[1]) / (dz[0]/2 + dz[1]/2) G_bottom = -lam * (Tz[-2] - Tz[-1]) / (dz[-2]/2 + dz[-1]/2) return np.concatenate(([G_surface], G_int, [G_bottom]))
# ----------------------------------------------------------------------------- # Equation (3) & (5) – integral form for *G(z)* # -----------------------------------------------------------------------------
[docs] def integrated_soil_heat_flux( rho_c: ArrayLike, T_before: ArrayLike, T_after: ArrayLike, dz: ArrayLike, dt: float, G_ref: float = 0.0, ) -> np.ndarray: # Eq. (5) """ Calculate the soil heat flux profile by integrating the change in heat storage upwards from a reference depth (Yang & Wang 2008, Eq. 5). .. math:: G(z_i) = G(z_{ref}) + \\int_{z_i}^{z_{ref}} \\rho_s c_s \\frac{\\partial T}{\\partial t} dz Parameters ---------- rho_c : ArrayLike A 1D array of volumetric heat capacity (`ρ_s c_s`) for each soil layer (J m⁻³ K⁻¹). T_before : ArrayLike A 1D array of temperatures at the start of the time step (K). T_after : ArrayLike A 1D array of temperatures at the end of the time step (K). dz : ArrayLike A 1D array of layer thicknesses (m). dt : float The duration of the time step (s). G_ref : float, optional The heat flux at the lower reference depth `z_ref` (W m⁻²). This is typically assumed to be zero at a sufficient depth, by default 0.0. Returns ------- numpy.ndarray An array of heat fluxes (W m⁻²) at the *upper interface* of each layer, with a size of `len(dz)`. """ rho_c = np.asarray(rho_c) dT = np.asarray(T_after) - np.asarray(T_before) # Storage change in each layer storage_change = rho_c * dT * dz / dt # Cumulatively sum storage changes from the bottom up # The flux at interface i is the reference flux plus the sum of storage # changes in all layers below i. return G_ref + np.cumsum(storage_change[::-1])[::-1]
# ----------------------------------------------------------------------------- # Equation (4a–c) – volumetric heat capacity # -----------------------------------------------------------------------------
[docs] def volumetric_heat_capacity( theta: ArrayLike, theta_sat: float | ArrayLike ) -> np.ndarray: # Eq. 4 """ Calculate the volumetric heat capacity of moist soil based on its water content (Yang & Wang 2008, Eq. 4). .. math:: \\rho_s c_s = (1 - \\theta_{sat}) \\rho_{d}c_{d} + \\theta \\rho_w c_w Parameters ---------- theta : ArrayLike Volumetric water content (m³ m⁻³). theta_sat : float or ArrayLike Soil porosity, i.e., saturated volumetric water content (m³ m⁻³). Returns ------- numpy.ndarray The volumetric heat capacity `ρ_s c_s` (J m⁻³ K⁻¹). """ theta = np.asarray(theta, dtype=float) theta_sat = np.asarray(theta_sat, dtype=float) # From paper, typical values for dry soil and water rho_c_dry = (1.0 - theta_sat) * 2.1e6 rho_c_water = CP_WATER return rho_c_dry + rho_c_water * theta
# ----------------------------------------------------------------------------- # Equation (6a–b) – stretched vertical grid # -----------------------------------------------------------------------------
[docs] def stretched_grid(n: int, D: float, xi: float) -> np.ndarray: # Eq. 6 """ Generate layer thicknesses for a vertically stretched grid. The grid layer thickness increases exponentially with depth, allowing for higher resolution near the surface. .. math:: \\Delta z_i = \\Delta z_0 \\exp(\\xi (i-1)) Parameters ---------- n : int The number of soil layers. D : float The total depth of the soil domain (m). xi : float The stretching parameter. `xi = 0` results in a uniform grid. `xi > 0` results in a grid that is finer at the top. Returns ------- numpy.ndarray A 1D array of thickness `Δz_i` for each of the `n` layers (m). """ if xi == 0: return np.full(n, D / n) # First layer thickness, derived from the sum of geometric series delta_z0 = D * (math.exp(xi) - 1) / (math.exp(n * xi) - 1) # Thickness of each subsequent layer dz = delta_z0 * np.exp(xi * np.arange(n)) return dz
# ----------------------------------------------------------------------------- # Equation (7) – implicit TDE discretisation (tridiagonal system) # ----------------------------------------------------------------------------- def tridiagonal_coeffs( dz: ArrayLike, rho_c: ArrayLike, lambda_s: ArrayLike | float, dt: float, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Construct the coefficient diagonals (A, B, C) for the tridiagonal matrix system used in the implicit TDE solver (Yang & Wang 2008, Eq. 7b). These coefficients represent the discretized heat diffusion equation. Parameters ---------- dz : ArrayLike A 1D array of layer thicknesses (m). rho_c : ArrayLike A 1D array of volumetric heat capacity for each layer (J m⁻³ K⁻¹). lambda_s : ArrayLike or float Thermal conductivity for each layer (W m⁻¹ K⁻¹). dt : float The time step (s). Returns ------- Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray] A tuple containing the sub-diagonal (A), main-diagonal (B), and super-diagonal (C) of the tridiagonal matrix. """ dz = np.asarray(dz) rho_c = np.asarray(rho_c) lam = np.asarray(lambda_s) if np.ndim(lambda_s) else float(lambda_s) n = len(dz) # Effective conductivity at interfaces if np.ndim(lam): lam_up = 0.5 * (lam[:-1] + lam[1:]) lam_dn = lam_up else: lam_up = lam_dn = np.full(n - 1, lam) # Coefficients from finite difference scheme alpha = lam_up / (0.5 * (dz[:-1] + dz[1:])) beta = lam_dn / (0.5 * (dz[1:] + dz[:-1])) A = -alpha / dz[:-1] C = -beta / dz[1:] # Adjust for boundary conditions B_full = np.zeros(n) B_full[0] = rho_c[0]/dt + alpha[0] B_full[-1] = rho_c[-1]/dt + beta[-1] if n > 2: B = rho_c[1:-1]/dt - A[1:] - C[:-1] B_full[1:-1] = B return A, B_full, C
[docs] def solve_tde( T_prev: ArrayLike, dz: ArrayLike, rho_c: ArrayLike, lambda_s: ArrayLike | float, Tsfc: float, Tbot: float, dt: float, ) -> np.ndarray: """ Solve the 1D thermal diffusion equation for one time step using an implicit Crank-Nicolson scheme (Yang & Wang 2008, Eq. 7). This function sets up and solves the tridiagonal system of linear equations `M * T_new = D` to find the temperature profile at the next time step. Parameters ---------- T_prev : ArrayLike Temperature profile at the previous time step (K). dz : ArrayLike Layer thicknesses (m). rho_c : ArrayLike Volumetric heat capacity of each layer (J m⁻³ K⁻¹). lambda_s : ArrayLike or float Thermal conductivity of each layer (W m⁻¹ K⁻¹). Tsfc : float Surface temperature boundary condition (K). Tbot : float Bottom temperature boundary condition (K). dt : float Time step (s). Returns ------- numpy.ndarray The new temperature profile at `t + dt`, including boundary nodes. """ from scipy.linalg import solve_banded T_prev = np.asarray(T_prev) A, B, C = tridiagonal_coeffs(dz, rho_c, lambda_s, dt) n = len(B) # Assemble RHS vector D from Eq. 7b D_vec = np.asarray(rho_c) * T_prev / dt # Apply Dirichlet boundary conditions D_vec[0] += A[0] * Tsfc D_vec[-1] += C[-1] * Tbot # Create the banded matrix for SciPy's solver # The matrix has shape (3, n) for a tridiagonal system ab = np.zeros((3, n)) ab[0, 1:] = -C # Super-diagonal ab[1, :] = B # Main-diagonal ab[2, :-1] = -A # Sub-diagonal T_new_internal = solve_banded((1, 1), ab, D_vec) return np.concatenate(([Tsfc], T_new_internal, [Tbot]))
# ----------------------------------------------------------------------------- # Temperature‑profile correction (Section 2.2) # -----------------------------------------------------------------------------
[docs] def correct_profile( T_model: ArrayLike, depths_model: ArrayLike, T_obs: ArrayLike, depths_obs: ArrayLike ) -> np.ndarray: """ Correct a modeled temperature profile using observed temperatures. This function calculates the bias (error) between the model and observations at the observation depths, then linearly interpolates this bias across the entire model grid to correct the profile. Parameters ---------- T_model : ArrayLike The modeled temperature profile (K). depths_model : ArrayLike The depths corresponding to `T_model` (m). T_obs : ArrayLike The observed temperatures (K). depths_obs : ArrayLike The depths of the observations (m). Returns ------- numpy.ndarray The corrected temperature profile. """ T_model = np.asarray(T_model) # First, interpolate the model temperature to the observation depths T_model_at_obs_depths = np.interp(depths_obs, depths_model, T_model) # Calculate the bias at observation depths bias_at_obs_depths = T_obs - T_model_at_obs_depths # Interpolate the bias to all model depths bias_on_model_grid = np.interp(depths_model, depths_obs, bias_at_obs_depths) return T_model + bias_on_model_grid
# ----------------------------------------------------------------------------- # Equation (8) – surface temperature from long‑wave radiation # -----------------------------------------------------------------------------
[docs] def surface_temperature_longwave( R_lw_up: float, R_lw_dn: float, emissivity: float = 0.98 ) -> float: """ Calculate surface temperature from upward and downward long-wave radiation measurements using the Stefan-Boltzmann law (Yang & Wang 2008, Eq. 8). .. math:: T_s = \\left[ \\frac{R_{lw}^{\\uparrow} - (1 - \\epsilon) R_{lw}^{\\downarrow}} {\\epsilon \\sigma} \\right]^{1/4} Parameters ---------- R_lw_up : float Upwelling long-wave radiation (W m⁻²). R_lw_dn : float Downwelling long-wave radiation (W m⁻²). emissivity : float, optional Surface emissivity (dimensionless), by default 0.98. Returns ------- float The calculated surface temperature (K). """ numerator = R_lw_up - (1.0 - emissivity) * R_lw_dn denominator = emissivity * SIGMA_SB return (numerator / denominator) ** 0.25
# ----------------------------------------------------------------------------- # Equation (9a–c) – thermal conductivity parameterisation # -----------------------------------------------------------------------------
[docs] def thermal_conductivity_yang2008( theta: ArrayLike, theta_sat: float, rho_dry: float | ArrayLike ) -> np.ndarray: """ Estimate soil thermal conductivity based on soil moisture and dry density, following Yang et al. (2005) as cited in Yang & Wang (2008, Eq. 9). Parameters ---------- theta : ArrayLike Volumetric water content (m³ m⁻³). theta_sat : float Saturated volumetric water content (porosity) (m³ m⁻³). rho_dry : float or ArrayLike Dry soil bulk density (kg m⁻³). Returns ------- numpy.ndarray The estimated soil thermal conductivity `λ_s` (W m⁻¹ K⁻¹). """ theta = np.asarray(theta, dtype=float) rho_dry = np.asarray(rho_dry, dtype=float) / 1000 # Convert to g cm-3 for formula # Eq. 9b for dry thermal conductivity lam_dry = (0.170 + 0.0647 * rho_dry) / (2.7 - 0.947 * rho_dry) - 0.2 # Eq. 9c for saturated thermal conductivity lam_sat = 2.0 # Eq. 9a mixing model lam = lam_dry + (lam_sat - lam_dry) * np.exp( 0.36 * (theta / theta_sat - 1.0) ) return lam
# ----------------------------------------------------------------------------- # Equation (10–11) – flux error for linear interpolation (diagnostic) # -----------------------------------------------------------------------------
[docs] def flux_error_linear( rho_c: ArrayLike, S2_minus_S1: ArrayLike, dt: float ) -> np.ndarray: # Eq. 11 """ Calculate the diagnostic error in heat flux that arises from assuming a linear temperature profile between measurement points (Yang & Wang 2008, Eq. 11). .. math:: \\Delta G_i = \\frac{\\rho_s c_s (S_{i,2} - S_{i,1})}{\\Delta t} Parameters ---------- rho_c : ArrayLike Volumetric heat capacity for each layer (J m⁻³ K⁻¹). S2_minus_S1 : ArrayLike The area difference representing the deviation from linearity. dt : float The time step (s). Returns ------- numpy.ndarray The flux error for each layer (W m⁻²). """ return np.asarray(rho_c) * np.asarray(S2_minus_S1) / dt
# ----------------------------------------------------------------------------- # Equation (12) – surface energy budget closure # -----------------------------------------------------------------------------
[docs] def surface_energy_residual(R_net: float, H: float, LE: float, G0: float) -> float: """ Calculate the residual of the surface energy budget (Yang & Wang 2008, Eq. 12). .. math:: \\Delta E = R_{net} - (H + LE + G_0) Parameters ---------- R_net : float Net radiation (W m⁻²). H : float Sensible heat flux (W m⁻²). LE : float Latent heat flux (W m⁻²). G0 : float Surface ground heat flux (W m⁻²). Returns ------- float The energy balance residual `ΔE` (W m⁻²). """ return R_net - (H + LE + G0)
# ----------------------------------------------------------------------------- # High‑level helper: one TDEC timestep # -----------------------------------------------------------------------------
[docs] def tdec_step( T_prev: ArrayLike, dz: ArrayLike, theta: ArrayLike, theta_sat: float, rho_dry: float, lambda_const: float, Tsfc: float, Tbot: float, dt: float, depths_model: ArrayLike, T_obs: ArrayLike, depths_obs: ArrayLike, ) -> Tuple[np.ndarray, np.ndarray]: """ Perform a single integration step of the TDEC (Temperature Diffusion Error Correction) scheme. This function encapsulates the predict-correct sequence for one time step. Parameters ---------- T_prev : ArrayLike Temperature profile from the previous time step (K). dz : ArrayLike Layer thicknesses (m). theta : ArrayLike Volumetric water content profile (m³ m⁻³). theta_sat : float Soil porosity (m³ m⁻³). rho_dry : float Dry soil bulk density (kg m⁻³). lambda_const : float A constant thermal conductivity for the prediction step (W m⁻¹ K⁻¹). Tsfc : float Surface temperature boundary condition (K). Tbot : float Bottom temperature boundary condition (K). dt : float Time step (s). depths_model : ArrayLike Depths of the model grid nodes (m). T_obs : ArrayLike Observed temperatures for the correction step (K). depths_obs : ArrayLike Depths of the observed temperatures (m). Returns ------- Tuple[np.ndarray, np.ndarray] A tuple containing: - `T_corr`: The corrected temperature profile at `t + dt`. - `G_prof`: The corresponding heat flux profile (W m⁻²) at layer interfaces. """ rho_c = volumetric_heat_capacity(theta, theta_sat) # 1. Predict new temperature profile using the TDE solver T_pred = solve_tde( T_prev=T_prev, dz=dz, rho_c=rho_c, lambda_s=lambda_const, Tsfc=Tsfc, Tbot=Tbot, dt=dt, ) # 2. Correct the predicted profile using observational data T_corr = correct_profile(T_pred, depths_model, T_obs, depths_obs) # 3. Compute the resulting heat flux profile from the corrected temperatures G_prof = integrated_soil_heat_flux( rho_c=rho_c, T_before=T_prev, T_after=T_corr[1:-1], # Use internal nodes of corrected profile dz=dz, dt=dt, G_ref=0.0, # Assume zero flux at the bottom ) return T_corr, G_prof
__all__ = [ "soil_heat_flux", "integrated_soil_heat_flux", "volumetric_heat_capacity", "stretched_grid", "solve_tde", "correct_profile", "surface_temperature_longwave", "thermal_conductivity_yang2008", "flux_error_linear", "surface_energy_residual", "tdec_step", ]