"""Cross-operating-point mode identification (global tracking).
Modes computed independently at successive operating points (rotor speeds or wind
speeds) are linked into tracks — the lines of a Campbell diagram. Rather than a
greedy adjacent-point match (which can commit early to a locally optimal pairing
that is globally wrong), the tracker extracts each mode line as the globally
optimal continuity path through the whole sweep: it repeatedly finds the
single best-continuity path over the not-yet-assigned modes (a deterministic
dynamic program over the modal-assurance-criterion / frequency-continuity affinity)
and removes it, until every mode is assigned.
The result is deterministic — no random restarts or preset cluster counts — and
each track carries a confidence (its weakest link's correlation) and an ambiguity
flag (raised where a mode had two near-equally correlated continuations, i.e. a
crossing or veering hotspot). Every track is given a physical label.
Optimality caveat: each extracted path is the globally optimal *single* continuity
path over the remaining modes, but the extraction is sequential, not a simultaneous
globally optimal *multi-track* assignment. In dense spectra or at near-degenerate
crossings an earlier high-scoring path can still claim a node a later path would have
preferred; the ``is_ambiguous`` flag marks exactly those hotspots, and ``confidence``
exposes how trustworthy each line is.
"""
from __future__ import annotations
from dataclasses import dataclass
from itertools import pairwise
from typing import TYPE_CHECKING
import numpy as np
from vane.modal.labeler import ModeLabel, label_mode
from vane.modal.mac import compute_mac
if TYPE_CHECKING:
from collections.abc import Sequence
import numpy.typing as npt
from vane.modal.eigensolver import ModalSolution
__all__ = ["IdentificationResult", "ModeTrack", "identify_modes"]
_NEG_INF = -np.inf
# A small reward so any threshold-passing link is preferred over breaking the
# track, even when the frequency penalty zeroes its weight (e.g. frequency_weight=1
# at the span extreme). It does not change the ranking among allowed links.
_LINK_BONUS = 1e-3
[docs]
@dataclass
class ModeTrack:
"""A mode followed across operating points.
Parameters
----------
operating_points : list[int]
Indices of the operating points this track spans, in order.
mode_indices : list[int]
The mode index within each corresponding operating point.
natural_frequencies_hz : list[float]
Natural frequency at each operating point along the track.
damping_ratios : list[float]
Damping ratio at each operating point along the track.
label : ModeLabel
Representative physical label (the most confident along the track).
confidence : float
The weakest link correlation along the track (minimum MAC between
consecutive points), in ``[0, 1]``; ``1.0`` for a single-point track.
is_ambiguous : bool
Whether any link was a near-tie (a mode with two near-equally correlated
continuations), marking a crossing or veering hotspot.
"""
operating_points: list[int]
mode_indices: list[int]
natural_frequencies_hz: list[float]
damping_ratios: list[float]
label: ModeLabel
confidence: float = 1.0
is_ambiguous: bool = False
[docs]
@dataclass
class IdentificationResult:
"""Result of cross-operating-point mode identification.
Parameters
----------
tracks : list[ModeTrack]
The identified mode tracks, ordered by their first natural frequency.
n_operating_points : int
Number of operating points processed.
"""
tracks: list[ModeTrack]
n_operating_points: int
[docs]
def identify_modes(
solutions: Sequence[ModalSolution],
*,
frequency_weight: float = 0.5,
mac_threshold: float = 0.5,
ambiguity_margin: float = 0.2,
) -> IdentificationResult:
"""Link modes across operating points into labeled tracks, globally.
Parameters
----------
solutions : Sequence[ModalSolution]
Modal solutions, one per operating point, in operating-point order. All
must share the same number of mode-shape degrees of freedom.
frequency_weight : float, optional
Weight of the frequency-continuity penalty in the link affinity, ``[0, 1]``.
mac_threshold : float, optional
Minimum modal assurance criterion for two modes to be linkable.
ambiguity_margin : float, optional
A link is flagged ambiguous when the best continuation's MAC exceeds the
second best by less than this margin.
Returns
-------
IdentificationResult
The identified tracks and the number of operating points.
Raises
------
ValueError
If ``frequency_weight``, ``mac_threshold``, or ``ambiguity_margin`` is not
in ``[0, 1]``, or the solutions have inconsistent mode-shape DOF layout.
"""
for name, value in (
("frequency_weight", frequency_weight),
("mac_threshold", mac_threshold),
("ambiguity_margin", ambiguity_margin),
):
if not 0.0 <= value <= 1.0:
msg = f"{name} must be in [0, 1], got {value}"
raise ValueError(msg)
if not solutions:
return IdentificationResult(tracks=[], n_operating_points=0)
_validate_consistent_dofs(solutions)
macs = _pairwise_macs(solutions)
affinities = _pairwise_affinities(solutions, macs, frequency_weight, mac_threshold)
entries = _extract_tracks(solutions, affinities)
return _build_result(entries, solutions, macs, ambiguity_margin)
def _pairwise_macs(
solutions: Sequence[ModalSolution],
) -> list[npt.NDArray[np.float64]]:
"""Return the MAC matrix between each consecutive operating-point pair."""
return [
compute_mac(solutions[k].mode_shapes, solutions[k + 1].mode_shapes)
for k in range(len(solutions) - 1)
]
def _pairwise_affinities(
solutions: Sequence[ModalSolution],
macs: list[npt.NDArray[np.float64]],
frequency_weight: float,
mac_threshold: float,
) -> list[npt.NDArray[np.float64]]:
"""Return the link affinity (MAC x frequency continuity) per pair.
Entries below ``mac_threshold`` are set to ``-inf`` so they cannot be linked.
"""
affinities: list[npt.NDArray[np.float64]] = []
for k, mac in enumerate(macs):
weight = mac.copy()
freqs_a = solutions[k].natural_frequencies_hz
freqs_b = solutions[k + 1].natural_frequencies_hz
if frequency_weight > 0.0 and freqs_a.size and freqs_b.size:
both = np.concatenate([freqs_a, freqs_b])
span = float(both.max() - both.min())
if span > 0.0:
delta = np.abs(freqs_a[:, np.newaxis] - freqs_b[np.newaxis, :])
penalty = np.clip(1.0 - frequency_weight * delta / span, 0.0, 1.0)
weight = mac * penalty
affinities.append(
np.where(mac >= mac_threshold, weight + _LINK_BONUS, _NEG_INF)
)
return affinities
def _extract_tracks(
solutions: Sequence[ModalSolution],
affinities: list[npt.NDArray[np.float64]],
) -> list[list[tuple[int, int]]]:
"""Greedily extract globally-optimal continuity paths until all modes used."""
used = [np.zeros(solution.n_modes, dtype=np.bool_) for solution in solutions]
tracks: list[list[tuple[int, int]]] = []
while True:
path = _best_path(used, affinities)
if path is None:
return tracks
for op, mode in path:
used[op][mode] = True
tracks.append(path)
def _best_path(
used: list[npt.NDArray[np.bool_]],
affinities: list[npt.NDArray[np.float64]],
) -> list[tuple[int, int]] | None:
"""Return the highest-affinity contiguous path over unused modes, or None.
A dynamic program over the layered graph (operating points as layers, modes as
nodes): a path may start at any operating point and is extended only along
links whose affinity is finite (MAC above threshold).
"""
n_op = len(used)
best: list[npt.NDArray[np.float64]] = []
parent: list[npt.NDArray[np.int_]] = []
for op in range(n_op):
start = np.where(used[op], _NEG_INF, 0.0)
link_parent = np.full(used[op].shape[0], -1, dtype=np.int_)
if op > 0 and best[op - 1].size and start.size:
candidate = best[op - 1][:, np.newaxis] + affinities[op - 1]
from_prev = candidate.max(axis=0) if candidate.size else start
best_i = candidate.argmax(axis=0) if candidate.size else link_parent
# A used target mode can be neither started nor entered, so gate the
# extension on the target being available -- otherwise a later path
# could route through an already-assigned (op, mode).
take = (from_prev > start) & ~used[op]
start = np.where(take, from_prev, start)
link_parent = np.where(take, best_i, link_parent)
best.append(start)
parent.append(link_parent)
end_op, end_mode, top = -1, -1, _NEG_INF
for op in range(n_op):
if best[op].size and float(best[op].max()) > top:
top = float(best[op].max())
end_op, end_mode = op, int(best[op].argmax())
if top == _NEG_INF:
return None
path: list[tuple[int, int]] = []
op, mode = end_op, end_mode
while mode != -1:
path.append((op, mode))
previous = int(parent[op][mode])
op, mode = op - 1, previous
path.reverse()
return path
def _build_result(
tracks: list[list[tuple[int, int]]],
solutions: Sequence[ModalSolution],
macs: list[npt.NDArray[np.float64]],
ambiguity_margin: float,
) -> IdentificationResult:
"""Assemble ModeTrack objects with confidence and ambiguity, sorted by freq."""
built: list[ModeTrack] = []
for entries in tracks:
freqs = [float(solutions[op].natural_frequencies_hz[m]) for op, m in entries]
damps = [float(solutions[op].damping_ratios[m]) for op, m in entries]
confidence, ambiguous = _track_quality(entries, macs, ambiguity_margin)
built.append(
ModeTrack(
operating_points=[op for op, _ in entries],
mode_indices=[m for _, m in entries],
natural_frequencies_hz=freqs,
damping_ratios=damps,
label=_track_label(entries, solutions),
confidence=confidence,
is_ambiguous=ambiguous,
)
)
built.sort(key=lambda track: track.natural_frequencies_hz[0])
return IdentificationResult(tracks=built, n_operating_points=len(solutions))
def _track_quality(
entries: list[tuple[int, int]],
macs: list[npt.NDArray[np.float64]],
ambiguity_margin: float,
) -> tuple[float, bool]:
"""Return a track's weakest-link MAC and whether any link is a near-tie."""
confidence = 1.0
ambiguous = False
for (op_a, mode_a), (_, mode_b) in pairwise(entries):
mac_row = macs[op_a][mode_a]
confidence = min(confidence, float(mac_row[mode_b]))
if mac_row.shape[0] >= 2:
ordered = np.sort(mac_row)[::-1]
if float(ordered[0] - ordered[1]) < ambiguity_margin:
ambiguous = True
return confidence, ambiguous
def _track_label(
entries: list[tuple[int, int]], solutions: Sequence[ModalSolution]
) -> ModeLabel:
"""Return the most confident label among a track's modes."""
candidates = [
label_mode(
np.abs(solutions[op].mode_shapes[:, mode]), solutions[op].dof_descriptions
)
for op, mode in entries
if solutions[op].dof_descriptions
]
if not candidates:
from vane.config.dof_map import DofCategory
return ModeLabel(DofCategory.UNKNOWN, "Unidentified", 0.0, [])
return max(candidates, key=lambda label: label.confidence)
def _validate_consistent_dofs(solutions: Sequence[ModalSolution]) -> None:
"""Validate that every solution shares the reference mode-shape DOF layout.
The DOF count must match, and when descriptions are available they must match
too, so the modal assurance criterion compares corresponding degrees of
freedom rather than silently mixing unrelated rows.
"""
reference = solutions[0]
ref_n = reference.mode_shapes.shape[0]
ref_descriptions = reference.dof_descriptions
for index, solution in enumerate(solutions[1:], start=1):
if solution.mode_shapes.shape[0] != ref_n:
msg = (
f"Operating point {index} has {solution.mode_shapes.shape[0]} "
f"mode-shape DOFs, expected {ref_n}"
)
raise ValueError(msg)
if (
ref_descriptions
and solution.dof_descriptions
and solution.dof_descriptions != ref_descriptions
):
msg = (
f"Operating point {index} has different DOF descriptions from the "
f"reference; mode shapes are not comparable"
)
raise ValueError(msg)