EcoMathDNAHMM — Ecological–Topological Hidden Markov Model (NMAI Model 1A)

ECO-Math-DNA-HMM-CODEX Specification

Ancestral migration path estimation over two-millennia horizon via eco-topological HMM with Sentinel change-point detection.
Independence: No external frameworks; fully standalone implementation.

1. State Space & Horizon

1.1 Purpose of the State Space

In the CODEX model, a state represents a complete environmental "snapshot" that a human lineage could occupy at a specific time. Each state holds the basic conditions required for survival: climate, landform, available water, vegetation, and soil nutrients. Instead of treating land as an empty grid square, every cell carries measurable ecological information. The model then follows how a population might move from one viable cell to another through time.

Reference: Odum E.P. & Barrett G.W. (2005) Fundamentals of Ecology, Ch. 3 (Energy Flow and Ecosystem Structure).

1.2 Temporal Resolution ("The Horizon")

Time is divided into generations, each lasting 25 years—the average age at which offspring appear in most human societies. Eighty generations give roughly two millennia ($25 \times 80 = 2{,}000$ years). This range captures both rapid migration events and long-term environmental shifts such as glaciation retreat or desertification.

$G = 25 \text{ years}, \quad T = 80 \text{ generations}$

Temporal Horizon: 2,000 Years Across 80 Generations t = 0 (Present) t = T = 80 (2000 yrs) t = 40 (1000 yrs) G = 25 yrs

Reference: Howell N. (2010) Demography of Human Populations, pp. 14-16.

1.3 Spatial Resolution (Eco-Topological Zones)

The spatial map is divided into eco-topological zones—areas that share similar climate, vegetation, soil texture, and hydrology. Each zone $r_i$ can be thought of as a "bioclimatic island." Moving from one island to another has a cost determined by terrain and resource difference.

$\mathcal{R} = \{r_1, r_2, \ldots, r_K\}, \quad r_t \in \mathcal{R}$

Typical data sources for defining these zones include:

  • FAO Global Ecological Zones (FAO 2020)
  • WorldClim 2 climate grids (Fick & Hijmans 2017)
  • SoilGrids 2.0 mineral composition dataset (Poggio et al. 2021)

1.4 Nutrient and Biological Profile of a Zone

Each zone carries a nutrient vector $\mathbf{N}_i$ summarising the biochemical base that sustains life there:

$\mathbf{N}_i = [C_i, N_i, P_i, Fe_i, Zn_i, H_2O_i, O_{\text{organic}}]$

where $C$, $N$, $P$ are macronutrients (carbon, nitrogen, phosphorus) and the rest are trace elements or organic-matter indicators. These quantities are obtained from soil chemistry databases or archaeological isotope studies. They tell us whether a population could have survived on local flora and fauna.

Reference: Vitousek P.M. et al. (2010) "Terrestrial Ecosystem Nutrients," Nature Geoscience 3: 673-679.

2. ECO-Math Transition Model (Eco-Topology First-Class)

Transitions represent physical or ecological movement between zones. Movement cost increases with terrain distance $d_{\text{topo}}$ and ecological dissimilarity $\Delta_{\text{eco}}$. These costs form an exponential kernel whose parameters $\lambda$ and $\kappa$ weight topology and ecology. Each row of $\mathbf{A}$ is normalised to ensure valid probabilities.

Transition Probability:

$a_{ij}(t) = P(r_{t+1} = j \mid r_t = i) \propto \exp\left[-\lambda \cdot d_{\text{topo}}(i,j) - \kappa \cdot \Delta_{\text{eco}}(i,j)\right]$

Normalization: Row-normalize $\mathbf{A}_t = [a_{ij}(t)]$

Deep-Time Prior: $P(r_T)$ may be uniform or Brahman-biased

Eco-Topological State Transition Model $r_i$ $r_j$ $r_k$ $a_{ij}(t)$ low cost High $d_{\text{topo}}$ or $\Delta_{\text{eco}}$ Self-loop Transition Cost Components: • $d_{\text{topo}}(i,j)$ = Terrain distance (physical barrier cost) • $\Delta_{\text{eco}}(i,j)$ = Ecological dissimilarity (resource compatibility) • $\lambda, \kappa$ = Weighting parameters for topology vs ecology

3. Emission Model (DNA → State Likelihood with Depth Decay)

3.1 Autosomal Distance

$D^2(j) = \sum_s w_s\left(\hat{p}(s) - p_j(s)\right)^2, \quad \mathcal{L}_{\text{auto}}(Y \mid j) \propto \exp\left(-\frac{D^2(j)}{2\sigma^2}\right)$

Optional f-stat metric $F(j)$ may replace $D^2$ analogously.

3.2 Uniparental (if available)

Poisson clock on haplogroup mutations $m \sim \text{Pois}(\mu\tau)$ plus spatial prior $P(j \mid h)$.

3.3 Depth Weighting (IBD/recombination dropout)

$\omega(t) = \exp(-\beta t) \quad (\beta \approx \ln 2 \text{ or fit})$

Depth Decay Function $\omega(t) = e^{-\beta t}$ t=0 t=40 (1000 yrs) t=80 (2000 yrs) Weight $\omega(t)$ Generations (t) Deep ancestry (high t) → reduced emission weight

3.4 Composite Emission

$b_j(t) = \mathcal{E}(Y \mid r_t = j, t) = \omega(t) \cdot \mathcal{L}_{\text{auto}}(Y \mid j) \times \mathcal{L}_{\text{uni}}(h \mid j)$

4. Viterbi (MAP) Recursion — Log-Space Stability

Initialization (present, t = 0):

$\delta_0(j) = \log P(r_0 = j) + \log b_j(0), \quad \psi_0(j) = 0$

Recursion (t = 1, …, T):

$\delta_t(j) = \max_i\left[\delta_{t-1}(i) + \log a_{ij}(t-1)\right] + \log b_j(t)$

$\psi_t(j) = \arg\max_i\left[\delta_{t-1}(i) + \log a_{ij}(t-1)\right]$

Backtrack:

$r_T^* = \arg\max_j \delta_T(j); \quad r_{t-1}^* = \psi_t(r_t^*)$

Viterbi Trellis: MAP Path $\pi^*$ t = 0 $\delta_0$ $\delta_0$ t = 1 $\delta_1$ $\delta_1$ t = 2 $\delta_2$ $\delta_2$ ... t = T $\delta_T$ $\delta_T$ max $\psi$ pointer MAP Path $\pi^*$ (Backtracked) • $\delta_t(j)$ = Maximum log-probability of path ending in state $j$ at time $t$ • $\psi_t(j)$ = Backpointer storing argmax predecessor state

5. Forward–Backward (Posteriors)

Scaled Forward $\alpha$:

$\alpha_0(j) = \frac{P(r_0 = j) \cdot b_j(0)}{c_0}$

$\alpha_t(j) = \frac{b_j(t) \sum_i \alpha_{t-1}(i) \cdot a_{ij}(t-1)}{c_t}$

Scaled Backward $\beta$:

$\beta_T(j) = \frac{1}{c_T}$

$\beta_t(i) = \frac{\sum_j a_{ij}(t) \cdot b_j(t+1) \cdot \beta_{t+1}(j)}{c_{t+1}}$

Posterior:

$P(r_t = j \mid \cdot) = \gamma_t(j) = \alpha_t(j) \cdot \beta_t(j)$

6. Sentinels (Structural Change Points)

Bayes-factor thresholding across the top two states:

$\text{BF}_t = \frac{\max_j \gamma_t(j)}{\max_{j \neq j^*} \gamma_t(j)} \geq \Theta$

Report $(t, r_t^*, \text{BF}_t)$ when threshold exceeded

Sentinel Detection: Posterior Probability Over Time State $j^*$ (dominant) State $j \neq j^*$ $\Theta$ Threshold S1 S2 Sentinel triggers when dominant state posterior exceeds threshold $\Theta$ Generations (t) Posterior $\gamma_t(j)$

7. Brahman Lineage Prior (Plug-in)

If Y-DNA $h$ (e.g., specific R1a-Z93/H subclades) is known:

$P(r_T) = \frac{P(r_T \mid h)}{\sum_j P(r_T \mid h)}$

and/or multiply $b_j(t)$ by $P(j \mid h)^{\eta}$ for early $t$ (tunable $\eta$):

$b_j(t) \leftarrow b_j(t) \cdot P(j \mid h)^{\eta} \quad \text{for } t \geq t_{\min}$

8. Pseudocode (Minimal Implementation)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Eco-Topo–DNA HMM Lineage Solver (Standalone)
Python + NumPy (+ Matplotlib for optional visuals)

Implements:
- Transition builder (eco-topology first-class)
- Emission builder (DNA likelihood with depth decay and optional uniparental prior)
- Viterbi MAP in log-space
- Scaled Forward–Backward posteriors
- Sentinel detection by Bayes-factor threshold
- Minimal demo with synthetic data + optional plots

No external frameworks. Ready to drop into your project.
"""

from __future__ import annotations
import math
import numpy as np
import matplotlib.pyplot as plt

# ---------------------------
# Utility
# ---------------------------

def row_normalize(A: np.ndarray) -> np.ndarray:
    """Row-normalize a 2D array in-place (avoid division by zero)."""
    # A: [K, K]
    sums = A.sum(axis=1, keepdims=True)
    sums[sums == 0.0] = 1.0
    return A / sums

def logsumexp(vec: np.ndarray) -> float:
    """Stable log-sum-exp for a 1D vector."""
    m = np.max(vec)
    return m + np.log(np.exp(vec - m).sum())

# ---------------------------
# Model Construction
# ---------------------------

def build_transitions(T: int,
                      d_topo: np.ndarray,
                      delta_eco: np.ndarray,
                      lam: float,
                      kap: float) -> np.ndarray:
    """
    Build transition tensors A[t, i, j] for t=0..T-1 from eco-topological costs.
      A[t, i, j] ∝ exp(-lam * d_topo[i,j] - kap * delta_eco[i,j])
    Shapes:
      d_topo, delta_eco: [K, K]
      return A: [T, K, K], row-normalized per t
    """
    K = d_topo.shape[0]
    A = np.empty((T, K, K), dtype=float)
    base = -lam * d_topo - kap * delta_eco
    for t in range(T):
        At = np.exp(base)  # identical each t, but keep interface flexible
        A[t] = row_normalize(At)
    return A

def build_emissions(T: int,
                    Y_hat: np.ndarray,          # [S] observed allele freqs / coded genotypes
                    p_ref: np.ndarray,          # [K, S] region by SNP reference allele freqs
                    w: np.ndarray,              # [S] SNP weights
                    sigma2: float,
                    beta: float,
                    P_region_given_h: np.ndarray | None = None,  # [K] or None
                    eta: float = 0.0) -> np.ndarray:
    """
    Build emission matrix b[t, j] with autosomal distance + depth decay + optional uniparental prior.
      D^2(j) = Σ_s w_s * (Y_hat[s] - p_ref[j,s])^2
      L_auto ∝ exp(-D^2/(2*sigma2))
      ω(t) = exp(-beta*t)
      b[t,j] = ω(t) * L_auto * (P_region_given_h[j] ** eta  if provided  else 1)
    Shapes:
      T: number of generations (0..T)
      p_ref: [K, S], Y_hat: [S], w: [S]
      returns b: [T+1, K]
    """
    K, S = p_ref.shape
    assert Y_hat.shape[0] == S and w.shape[0] == S
    # Precompute D^2 for each region j
    diff = (Y_hat[None, :] - p_ref)  # [K, S]
    D2 = (w[None, :] * diff * diff).sum(axis=1)  # [K]
    L_auto = np.exp(- D2 / (2.0 * sigma2))       # [K]

    if P_region_given_h is None:
        L_uni = np.ones(K, dtype=float)
    else:
        L_uni = np.power(P_region_given_h, eta)

    base_like = L_auto * L_uni  # [K]
    b = np.empty((T + 1, K), dtype=float)
    for t in range(T + 1):
        omega = math.exp(-beta * t)
        b[t] = omega * base_like
    return b

# ---------------------------
# Inference: Viterbi MAP (log-space)
# ---------------------------

def viterbi_map(A: np.ndarray, b: np.ndarray, init_prior: np.ndarray) -> np.ndarray:
    """
    Viterbi MAP path in log-space.
    Inputs:
      A: [T, K, K]
      b: [T+1, K]
      init_prior: [K], P(r0=j)
    Return:
      path: [T+1] int indices of states 0..K-1
    """
    T, K, _ = A.shape
    # log-space arrays
    delta = np.full((T + 1, K), -np.inf, dtype=float)
    psi = np.zeros((T + 1, K), dtype=int)

    # init
    with np.errstate(divide='ignore'):
        log_init = np.log(init_prior + 1e-300)
        log_b0 = np.log(b[0] + 1e-300)
    delta[0] = log_init + log_b0
    psi[0] = 0

    # recurse
    for t in range(1, T + 1):
        # δ_t(j) = max_i [ δ_{t-1}(i) + log A_{t-1}(i->j) ] + log b_t(j)
        with np.errstate(divide='ignore'):
            log_A = np.log(A[t - 1] + 1e-300)  # [K, K]
            log_b = np.log(b[t] + 1e-300)      # [K]
        for j in range(K):
            scores = delta[t - 1] + log_A[:, j]  # [K]
            arg = np.argmax(scores)
            delta[t, j] = scores[arg] + log_b[j]
            psi[t, j] = arg

    # backtrack
    path = np.zeros(T + 1, dtype=int)
    path[T] = int(np.argmax(delta[T]))
    for t in range(T, 0, -1):
        path[t - 1] = psi[t, path[t]]
    return path

# ---------------------------
# Inference: Forward–Backward (scaled)
# ---------------------------

def forward_backward_scaled(A: np.ndarray, b: np.ndarray, init_prior: np.ndarray):
    """
    Scaled forward-backward to compute posteriors γ[t, j].
    Inputs:
      A: [T, K, K], b: [T+1, K], init_prior: [K]
    Returns:
      gamma: [T+1, K] posteriors per time
      alpha: [T+1, K] scaled forward
      beta:  [T+1, K] scaled backward
      c:     [T+1] scaling constants
    """
    T, K, _ = A.shape
    alpha = np.zeros((T + 1, K), dtype=float)
    beta = np.zeros((T + 1, K), dtype=float)
    c = np.zeros(T + 1, dtype=float)

    # forward init
    alpha[0] = init_prior * b[0]
    c[0] = alpha[0].sum()
    if c[0] == 0.0:
        c[0] = 1.0
    alpha[0] /= c[0]

    # forward
    for t in range(1, T + 1):
        alpha[t] = (alpha[t - 1] @ A[t - 1]) * b[t]
        c[t] = alpha[t].sum()
        if c[t] == 0.0:
            c[t] = 1.0
        alpha[t] /= c[t]

    # backward init
    beta[T] = 1.0 / c[T]
    # backward
    for t in range(T - 1, -1, -1):
        beta[t] = (A[t] @ (b[t + 1] * beta[t + 1])) / c[t]

    # posteriors
    gamma = alpha * beta
    gamma /= gamma.sum(axis=1, keepdims=True)
    return gamma, alpha, beta, c

# ---------------------------
# Sentinels (change-points)
# ---------------------------

def extract_sentinels(gamma: np.ndarray, bf_threshold: float = 10.0):
    """
    Find time indices where top-1 posterior state dominates top-2 by BF >= threshold.
    Returns list of (t, j_star, BF_t).
    """
    T_plus_1, K = gamma.shape
    sentinels = []
    eps = 1e-12
    for t in range(T_plus_1):
        order = np.argsort(-gamma[t])
        j1, j2 = order[0], order[1] if K > 1 else order[0]
        p1, p2 = gamma[t, j1], gamma[t, j2]
        BF = p1 / max(p2, eps)
        if BF >= bf_threshold:
            sentinels.append((t, int(j1), float(BF)))
    return sentinels

# ---------------------------
# Optional Visuals
# ---------------------------

def plot_results(gamma: np.ndarray, path: np.ndarray, title: str = "Posteriors & MAP Path"):
    """
    Heatmap of γ[t,j] with MAP path overlay.
    """
    T_plus_1, K = gamma.shape
    plt.figure(figsize=(12, 5))
    plt.imshow(gamma.T, aspect='auto', origin='lower', interpolation='nearest')
    plt.colorbar(label='Posterior γ(t, j)')
    plt.plot(range(T_plus_1), path, 'w-', linewidth=2, label='MAP Path')
    plt.yticks(range(K), [f"S{j}" for j in range(K)])
    plt.xlabel("Generation t")
    plt.ylabel("Region")
    plt.title(title)
    plt.legend(loc='upper right')
    plt.tight_layout()

# ---------------------------
# Minimal Demo (synthetic)
# ---------------------------

def demo():
    """
    Self-contained synthetic run to verify everything executes.
    Replace synthetic blocks with your real d_topo, delta_eco, Y_hat, p_ref, etc.
    """
    np.random.seed(7)

    # Horizon & sizes
    T = 40                     # reduce for quick demo; set 80 for full 2,000 yrs
    K = 8                      # number of eco-regions
    S = 200                    # number of SNPs

    # Synthetic eco-topo costs (symmetric, metric-like)
    coords = np.random.randn(K, 2)
    d_topo = np.sqrt(((coords[:, None, :] - coords[None, :, :]) ** 2).sum(axis=2))
    # biome dissimilarity (random but symmetric non-negative)
    delta_eco = np.abs(np.random.randn(K, K)); delta_eco = 0.5*(delta_eco + delta_eco.T)

    # Weights
    lam, kap = 1.2, 0.8

    # Reference allele freqs per region
    p_ref = np.clip(np.random.beta(a=2, b=5, size=(K, S)), 1e-3, 1-1e-3)

    # True hidden path (just to generate a mock Y_hat near some region)
    true_path = np.random.choice(K, size=T+1)
    anchor_region = true_path[0]  # pretend present-time ancestry is close to state at t=0
    # Observed "genotype" / allele-freq vector centered near anchor region distribution
    Y_hat = np.clip(p_ref[anchor_region] + 0.03*np.random.randn(S), 1e-3, 1-1e-3)

    # SNP weights and variance
    w = np.ones(S, dtype=float)
    sigma2 = 0.05

    # Depth decay
    beta = math.log(2.0)  # 1/2 per generation weight (tunable)

    # Optional uniparental spatial prior (here uniform)
    P_region_given_h = None
    eta = 0.0

    # Init prior (present-time region probabilities); use uniform
    init_prior = np.ones(K, dtype=float) / K

    # Build model
    A = build_transitions(T, d_topo, delta_eco, lam, kap)
    b = build_emissions(T, Y_hat, p_ref, w, sigma2, beta,
                        P_region_given_h=P_region_given_h, eta=eta)

    # Inference
    path_map = viterbi_map(A, b, init_prior)
    gamma, alpha, beta_fb, c = forward_backward_scaled(A, b, init_prior)
    sentinels = extract_sentinels(gamma, bf_threshold=8.0)

    # Print quick summaries
    print("=== MAP path (first 15 gens) ===")
    print(path_map[:15])
    print("... (len =", len(path_map), ")")

    print("\n=== Top-5 Sentinel change-points (t, state, BF) ===")
    for row in sentinels[:5]:
        print(row)

    # Plot (optional)
    try:
        plot_results(gamma, path_map, title="Eco-Topo–DNA HMM: Posterior Heatmap & MAP Path")
        plt.show()
    except Exception as e:
        print("Plotting skipped:", e)

# ---------------------------
# Entrypoint
# ---------------------------

if __name__ == "__main__":
    demo()
 

9. Outputs

MAP Path

$\pi^*$

Maximum a-posteriori lineage trajectory

Credible Corridor

$\{j : \gamma_t(j) \geq \tau\}$

Per-time confidence region

Attribution

$\log b_j(t)$ vs $\log a_{ij}(t)$

Emission vs transition contributions

Sentinel List

$(t, r_t^*, \text{BF}_t)$

Structural change points

10. Specification Summary

ComponentSpecificationMathematical Form
Horizon2,000 years (80 generations × 25 years)$G = 25$, $T = 80$
State SpaceEco-topological zones with nutrient vectors$\mathcal{R} = \{r_1, \ldots, r_K\}$, $\mathbf{N}_i$
TransitionEco-topological cost kernel$a_{ij} \propto \exp[-\lambda d_{\text{topo}} - \kappa \Delta_{\text{eco}}]$
EmissionDNA likelihood with depth decay$b_j(t) = \omega(t) \cdot \mathcal{L}_{\text{auto}} \times \mathcal{L}_{\text{uni}}$
MAP InferenceViterbi algorithm (log-space)$\delta_t(j)$, $\psi_t(j)$ backpointers
PosteriorsForward–Backward (scaled)$\gamma_t(j) = \alpha_t(j)\beta_t(j)$
Change DetectionSentinel Bayes-Factor thresholding$\text{BF}_t = \frac{\max \gamma_t(j)}{\max_{j \neq j^*} \gamma_t(j)} \geq \Theta$
ComplexityTime: $O(TK^2)$ or $O(T|E|)$ sparseSpace: $O(TK)$ for backpointers

Complexity: $O(TK^2)$ (sparse $\mathbf{A} \Rightarrow O(T|E|)$) | Fully Standalone | No External Frameworks