Source code for vane.modal.identifier

"""Cross-operating-point mode identification (global tracking).

Modes computed independently at successive operating points (rotor speeds or wind
speeds) are linked into tracks — the lines of a Campbell diagram. Rather than a
greedy adjacent-point match (which can commit early to a locally optimal pairing
that is globally wrong), the tracker extracts each mode line as the globally
optimal continuity path through the whole sweep: it repeatedly finds the
single best-continuity path over the not-yet-assigned modes (a deterministic
dynamic program over the modal-assurance-criterion / frequency-continuity affinity)
and removes it, until every mode is assigned.

The result is deterministic — no random restarts or preset cluster counts — and
each track carries a confidence (its weakest link's correlation) and an ambiguity
flag (raised where a mode had two near-equally correlated continuations, i.e. a
crossing or veering hotspot). Every track is given a physical label.

Optimality caveat: each extracted path is the globally optimal *single* continuity
path over the remaining modes, but the extraction is sequential, not a simultaneous
globally optimal *multi-track* assignment. In dense spectra or at near-degenerate
crossings an earlier high-scoring path can still claim a node a later path would have
preferred; the ``is_ambiguous`` flag marks exactly those hotspots, and ``confidence``
exposes how trustworthy each line is.
"""

from __future__ import annotations

from dataclasses import dataclass
from itertools import pairwise
from typing import TYPE_CHECKING

import numpy as np

from vane.modal.labeler import ModeLabel, label_mode
from vane.modal.mac import compute_mac

if TYPE_CHECKING:
    from collections.abc import Sequence

    import numpy.typing as npt

    from vane.modal.eigensolver import ModalSolution

__all__ = ["IdentificationResult", "ModeTrack", "identify_modes"]

_NEG_INF = -np.inf
# A small reward so any threshold-passing link is preferred over breaking the
# track, even when the frequency penalty zeroes its weight (e.g. frequency_weight=1
# at the span extreme). It does not change the ranking among allowed links.
_LINK_BONUS = 1e-3


[docs] @dataclass class ModeTrack: """A mode followed across operating points. Parameters ---------- operating_points : list[int] Indices of the operating points this track spans, in order. mode_indices : list[int] The mode index within each corresponding operating point. natural_frequencies_hz : list[float] Natural frequency at each operating point along the track. damping_ratios : list[float] Damping ratio at each operating point along the track. label : ModeLabel Representative physical label (the most confident along the track). confidence : float The weakest link correlation along the track (minimum MAC between consecutive points), in ``[0, 1]``; ``1.0`` for a single-point track. is_ambiguous : bool Whether any link was a near-tie (a mode with two near-equally correlated continuations), marking a crossing or veering hotspot. """ operating_points: list[int] mode_indices: list[int] natural_frequencies_hz: list[float] damping_ratios: list[float] label: ModeLabel confidence: float = 1.0 is_ambiguous: bool = False
[docs] @dataclass class IdentificationResult: """Result of cross-operating-point mode identification. Parameters ---------- tracks : list[ModeTrack] The identified mode tracks, ordered by their first natural frequency. n_operating_points : int Number of operating points processed. """ tracks: list[ModeTrack] n_operating_points: int
[docs] def identify_modes( solutions: Sequence[ModalSolution], *, frequency_weight: float = 0.5, mac_threshold: float = 0.5, ambiguity_margin: float = 0.2, ) -> IdentificationResult: """Link modes across operating points into labeled tracks, globally. Parameters ---------- solutions : Sequence[ModalSolution] Modal solutions, one per operating point, in operating-point order. All must share the same number of mode-shape degrees of freedom. frequency_weight : float, optional Weight of the frequency-continuity penalty in the link affinity, ``[0, 1]``. mac_threshold : float, optional Minimum modal assurance criterion for two modes to be linkable. ambiguity_margin : float, optional A link is flagged ambiguous when the best continuation's MAC exceeds the second best by less than this margin. Returns ------- IdentificationResult The identified tracks and the number of operating points. Raises ------ ValueError If ``frequency_weight``, ``mac_threshold``, or ``ambiguity_margin`` is not in ``[0, 1]``, or the solutions have inconsistent mode-shape DOF layout. """ for name, value in ( ("frequency_weight", frequency_weight), ("mac_threshold", mac_threshold), ("ambiguity_margin", ambiguity_margin), ): if not 0.0 <= value <= 1.0: msg = f"{name} must be in [0, 1], got {value}" raise ValueError(msg) if not solutions: return IdentificationResult(tracks=[], n_operating_points=0) _validate_consistent_dofs(solutions) macs = _pairwise_macs(solutions) affinities = _pairwise_affinities(solutions, macs, frequency_weight, mac_threshold) entries = _extract_tracks(solutions, affinities) return _build_result(entries, solutions, macs, ambiguity_margin)
def _pairwise_macs( solutions: Sequence[ModalSolution], ) -> list[npt.NDArray[np.float64]]: """Return the MAC matrix between each consecutive operating-point pair.""" return [ compute_mac(solutions[k].mode_shapes, solutions[k + 1].mode_shapes) for k in range(len(solutions) - 1) ] def _pairwise_affinities( solutions: Sequence[ModalSolution], macs: list[npt.NDArray[np.float64]], frequency_weight: float, mac_threshold: float, ) -> list[npt.NDArray[np.float64]]: """Return the link affinity (MAC x frequency continuity) per pair. Entries below ``mac_threshold`` are set to ``-inf`` so they cannot be linked. """ affinities: list[npt.NDArray[np.float64]] = [] for k, mac in enumerate(macs): weight = mac.copy() freqs_a = solutions[k].natural_frequencies_hz freqs_b = solutions[k + 1].natural_frequencies_hz if frequency_weight > 0.0 and freqs_a.size and freqs_b.size: both = np.concatenate([freqs_a, freqs_b]) span = float(both.max() - both.min()) if span > 0.0: delta = np.abs(freqs_a[:, np.newaxis] - freqs_b[np.newaxis, :]) penalty = np.clip(1.0 - frequency_weight * delta / span, 0.0, 1.0) weight = mac * penalty affinities.append( np.where(mac >= mac_threshold, weight + _LINK_BONUS, _NEG_INF) ) return affinities def _extract_tracks( solutions: Sequence[ModalSolution], affinities: list[npt.NDArray[np.float64]], ) -> list[list[tuple[int, int]]]: """Greedily extract globally-optimal continuity paths until all modes used.""" used = [np.zeros(solution.n_modes, dtype=np.bool_) for solution in solutions] tracks: list[list[tuple[int, int]]] = [] while True: path = _best_path(used, affinities) if path is None: return tracks for op, mode in path: used[op][mode] = True tracks.append(path) def _best_path( used: list[npt.NDArray[np.bool_]], affinities: list[npt.NDArray[np.float64]], ) -> list[tuple[int, int]] | None: """Return the highest-affinity contiguous path over unused modes, or None. A dynamic program over the layered graph (operating points as layers, modes as nodes): a path may start at any operating point and is extended only along links whose affinity is finite (MAC above threshold). """ n_op = len(used) best: list[npt.NDArray[np.float64]] = [] parent: list[npt.NDArray[np.int_]] = [] for op in range(n_op): start = np.where(used[op], _NEG_INF, 0.0) link_parent = np.full(used[op].shape[0], -1, dtype=np.int_) if op > 0 and best[op - 1].size and start.size: candidate = best[op - 1][:, np.newaxis] + affinities[op - 1] from_prev = candidate.max(axis=0) if candidate.size else start best_i = candidate.argmax(axis=0) if candidate.size else link_parent # A used target mode can be neither started nor entered, so gate the # extension on the target being available -- otherwise a later path # could route through an already-assigned (op, mode). take = (from_prev > start) & ~used[op] start = np.where(take, from_prev, start) link_parent = np.where(take, best_i, link_parent) best.append(start) parent.append(link_parent) end_op, end_mode, top = -1, -1, _NEG_INF for op in range(n_op): if best[op].size and float(best[op].max()) > top: top = float(best[op].max()) end_op, end_mode = op, int(best[op].argmax()) if top == _NEG_INF: return None path: list[tuple[int, int]] = [] op, mode = end_op, end_mode while mode != -1: path.append((op, mode)) previous = int(parent[op][mode]) op, mode = op - 1, previous path.reverse() return path def _build_result( tracks: list[list[tuple[int, int]]], solutions: Sequence[ModalSolution], macs: list[npt.NDArray[np.float64]], ambiguity_margin: float, ) -> IdentificationResult: """Assemble ModeTrack objects with confidence and ambiguity, sorted by freq.""" built: list[ModeTrack] = [] for entries in tracks: freqs = [float(solutions[op].natural_frequencies_hz[m]) for op, m in entries] damps = [float(solutions[op].damping_ratios[m]) for op, m in entries] confidence, ambiguous = _track_quality(entries, macs, ambiguity_margin) built.append( ModeTrack( operating_points=[op for op, _ in entries], mode_indices=[m for _, m in entries], natural_frequencies_hz=freqs, damping_ratios=damps, label=_track_label(entries, solutions), confidence=confidence, is_ambiguous=ambiguous, ) ) built.sort(key=lambda track: track.natural_frequencies_hz[0]) return IdentificationResult(tracks=built, n_operating_points=len(solutions)) def _track_quality( entries: list[tuple[int, int]], macs: list[npt.NDArray[np.float64]], ambiguity_margin: float, ) -> tuple[float, bool]: """Return a track's weakest-link MAC and whether any link is a near-tie.""" confidence = 1.0 ambiguous = False for (op_a, mode_a), (_, mode_b) in pairwise(entries): mac_row = macs[op_a][mode_a] confidence = min(confidence, float(mac_row[mode_b])) if mac_row.shape[0] >= 2: ordered = np.sort(mac_row)[::-1] if float(ordered[0] - ordered[1]) < ambiguity_margin: ambiguous = True return confidence, ambiguous def _track_label( entries: list[tuple[int, int]], solutions: Sequence[ModalSolution] ) -> ModeLabel: """Return the most confident label among a track's modes.""" candidates = [ label_mode( np.abs(solutions[op].mode_shapes[:, mode]), solutions[op].dof_descriptions ) for op, mode in entries if solutions[op].dof_descriptions ] if not candidates: from vane.config.dof_map import DofCategory return ModeLabel(DofCategory.UNKNOWN, "Unidentified", 0.0, []) return max(candidates, key=lambda label: label.confidence) def _validate_consistent_dofs(solutions: Sequence[ModalSolution]) -> None: """Validate that every solution shares the reference mode-shape DOF layout. The DOF count must match, and when descriptions are available they must match too, so the modal assurance criterion compares corresponding degrees of freedom rather than silently mixing unrelated rows. """ reference = solutions[0] ref_n = reference.mode_shapes.shape[0] ref_descriptions = reference.dof_descriptions for index, solution in enumerate(solutions[1:], start=1): if solution.mode_shapes.shape[0] != ref_n: msg = ( f"Operating point {index} has {solution.mode_shapes.shape[0]} " f"mode-shape DOFs, expected {ref_n}" ) raise ValueError(msg) if ( ref_descriptions and solution.dof_descriptions and solution.dof_descriptions != ref_descriptions ): msg = ( f"Operating point {index} has different DOF descriptions from the " f"reference; mode shapes are not comparable" ) raise ValueError(msg)