"""High-level study orchestration with reproducibility provenance.
A :class:`Study` discovers the linearization files of a campaign, runs the modal
pipeline, and records the **provenance** of the result — the exact source files (with
content hashes), the azimuth coverage of each operating point, the tuning thresholds,
and the library version — so a Campbell diagram can be tied back to precisely the
inputs and assumptions that produced it. ``write_bundle`` serializes a reproducibility
package (a JSON manifest plus the result tables).
"""
from __future__ import annotations
import hashlib
import json
from collections import defaultdict
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from importlib.metadata import PackageNotFoundError, version
from pathlib import Path
from typing import TYPE_CHECKING
from vane.campbell.excitation import DEFAULT_HARMONICS
from vane.export import campbell_table, write_table
from vane.io.lin_reader import read_lin_file
from vane.pipeline import ModalPipeline
if TYPE_CHECKING:
from collections.abc import Sequence
from vane.io.lin_reader import LinFile
from vane.pipeline import PipelineResult
__all__ = [
"OperatingPointProvenance",
"Provenance",
"SourceFile",
"StudyResult",
"discover_operating_points",
"run_study",
]
_RPM_PARAMETER = "rotor_speed_rpm"
_DEG_PER_RAD = 180.0 / 3.141592653589793
_RPM_PER_RAD_S = 30.0 / 3.141592653589793
_HASH_CHUNK_BYTES = 65536
[docs]
@dataclass(frozen=True)
class SourceFile:
"""A source linearization file and the hash of its contents.
Parameters
----------
path : str
Path the file was read from.
sha256 : str
Hex-encoded SHA-256 digest of the file's bytes.
"""
path: str
sha256: str
[docs]
@dataclass(frozen=True)
class OperatingPointProvenance:
"""Azimuth-sweep coverage of one operating point.
Parameters
----------
n_azimuths : int
Number of azimuth samples in the operating point.
azimuth_min_deg, azimuth_max_deg : float
Smallest and largest sampled azimuth, in degrees.
rotor_speed_rpm : float
Rotor speed of the operating point, in rev/min.
"""
n_azimuths: int
azimuth_min_deg: float
azimuth_max_deg: float
rotor_speed_rpm: float
[docs]
@dataclass(frozen=True)
class Provenance:
"""A reproducibility record for a study run.
Parameters
----------
vane_version : str
Version of the library that produced the result.
created_at : str
ISO-8601 timestamp of when the provenance was recorded.
parameter_name : str
Operating parameter the sweep was run against.
frequency_weight, mac_threshold : float
Tracking tuning thresholds used.
harmonics : tuple[int, ...]
Excitation harmonics used for resonance detection.
source_files : tuple[SourceFile, ...]
Every input file with its content hash.
operating_points : tuple[OperatingPointProvenance, ...]
Per-operating-point azimuth coverage.
n_tracks, n_resonances : int
Number of identified mode tracks and detected resonance crossings.
"""
vane_version: str
created_at: str
parameter_name: str
frequency_weight: float
mac_threshold: float
harmonics: tuple[int, ...]
source_files: tuple[SourceFile, ...]
operating_points: tuple[OperatingPointProvenance, ...]
n_tracks: int
n_resonances: int
[docs]
def to_dict(self) -> dict[str, object]:
"""Return the provenance as a JSON-serializable dictionary."""
return asdict(self)
[docs]
@dataclass
class StudyResult:
"""The pipeline result of a study together with its provenance.
Parameters
----------
pipeline : PipelineResult
Every intermediate product of the analysis.
provenance : Provenance
The reproducibility record.
"""
pipeline: PipelineResult
provenance: Provenance
[docs]
def write_bundle(self, output_dir: str | Path) -> None:
"""Write a reproducibility bundle to ``output_dir``.
The bundle contains ``provenance.json`` (the manifest of inputs and
assumptions) and ``campbell.csv`` (the tracked-mode result table).
Parameters
----------
output_dir : str or pathlib.Path
Destination directory; created if it does not exist.
"""
destination = Path(output_dir)
destination.mkdir(parents=True, exist_ok=True)
manifest = json.dumps(self.provenance.to_dict(), indent=2)
(destination / "provenance.json").write_text(manifest, encoding="utf-8")
write_table(
campbell_table(self.pipeline.campbell), destination / "campbell.csv"
)
[docs]
def discover_operating_points(directory: str | Path) -> list[list[LinFile]]:
"""Discover and group a directory's ``.lin`` files into operating points.
Files are named ``<case>.<index>.lin`` by OpenFAST; those sharing a ``<case>``
root are one operating point's azimuth sweep.
Parameters
----------
directory : str or pathlib.Path
Directory containing ``.lin`` linearization files.
Returns
-------
list[list[LinFile]]
One list of parsed linearization files per operating point, ordered by case.
Raises
------
FileNotFoundError
If the directory contains no ``.lin`` files.
"""
root_dir = Path(directory)
groups: dict[str, list[Path]] = defaultdict(list)
for path in sorted(root_dir.glob("*.lin")):
root = path.name.rsplit(".", 2)[0]
groups[root].append(path)
if not groups:
msg = f"No .lin files found in {directory}"
raise FileNotFoundError(msg)
return [[read_lin_file(path) for path in groups[root]] for root in sorted(groups)]
[docs]
def run_study(
operating_points: Sequence[Sequence[LinFile]],
*,
parameter_name: str = _RPM_PARAMETER,
frequency_weight: float = 0.5,
mac_threshold: float = 0.5,
harmonics: Sequence[int] = DEFAULT_HARMONICS,
timestamp: datetime | None = None,
) -> StudyResult:
"""Run the modal pipeline and record the provenance of the result.
Parameters
----------
operating_points : Sequence[Sequence[LinFile]]
One azimuth sweep per operating point.
parameter_name : str, optional
Operating parameter to run against.
frequency_weight, mac_threshold : float, optional
Tracking tuning thresholds.
harmonics : Sequence[int], optional
Excitation harmonics for resonance detection.
timestamp : datetime or None, optional
Timestamp to record (defaults to the current UTC time); injectable for
reproducible tests.
Returns
-------
StudyResult
The pipeline result and its provenance.
"""
# The pipeline sorts operating points by the operating parameter; sort the same
# way up front so the provenance order matches the sorted Campbell result.
ordered = sorted(
operating_points,
key=lambda point: _operating_parameter(point, parameter_name),
)
pipeline = ModalPipeline(
frequency_weight=frequency_weight,
mac_threshold=mac_threshold,
harmonics=harmonics,
)
result = pipeline.run(ordered, parameter_name=parameter_name)
provenance = _build_provenance(
ordered,
result,
parameter_name=parameter_name,
frequency_weight=frequency_weight,
mac_threshold=mac_threshold,
# The pipeline validates and normalizes the harmonics (e.g. float -> int);
# record the normalized values, not the raw input.
harmonics=pipeline.harmonics,
timestamp=timestamp,
)
return StudyResult(pipeline=result, provenance=provenance)
def _operating_parameter(point: Sequence[LinFile], parameter_name: str) -> float:
"""Return an operating point's parameter value, matching the pipeline's average."""
if not point:
return 0.0
if parameter_name == _RPM_PARAMETER:
mean_rotor_speed = sum(lin.rotor_speed for lin in point) / len(point)
return mean_rotor_speed * _RPM_PER_RAD_S
return sum(lin.wind_speed for lin in point) / len(point)
def _build_provenance(
operating_points: Sequence[Sequence[LinFile]],
result: PipelineResult,
*,
parameter_name: str,
frequency_weight: float,
mac_threshold: float,
harmonics: Sequence[int],
timestamp: datetime | None,
) -> Provenance:
"""Assemble a :class:`Provenance` from the inputs and the pipeline result."""
# timezone.utc (not datetime.UTC) to stay compatible with Python 3.10.
moment = timestamp if timestamp is not None else datetime.now(timezone.utc) # noqa: UP017
sources: list[SourceFile] = []
coverage: list[OperatingPointProvenance] = []
for point in operating_points:
azimuths_deg = [lin.azimuth * _DEG_PER_RAD for lin in point]
# Averaged rotor speed (rpm), matching the pipeline's azimuth average, rather
# than an arbitrary single azimuth's value.
mean_rpm = (
sum(lin.rotor_speed for lin in point) / len(point) * _RPM_PER_RAD_S
if point
else 0.0
)
coverage.append(
OperatingPointProvenance(
n_azimuths=len(point),
azimuth_min_deg=min(azimuths_deg) if azimuths_deg else 0.0,
azimuth_max_deg=max(azimuths_deg) if azimuths_deg else 0.0,
rotor_speed_rpm=float(mean_rpm),
)
)
sources.extend(
SourceFile(path=str(lin.path), sha256=_file_sha256(lin.path))
for lin in point
)
return Provenance(
vane_version=_vane_version(),
created_at=moment.isoformat(),
parameter_name=parameter_name,
frequency_weight=frequency_weight,
mac_threshold=mac_threshold,
harmonics=tuple(harmonics),
source_files=tuple(sources),
operating_points=tuple(coverage),
n_tracks=len(result.tracks),
n_resonances=len(result.resonances),
)
def _file_sha256(path: Path) -> str:
"""Return the hex SHA-256 digest of a file's contents."""
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(_HASH_CHUNK_BYTES), b""):
digest.update(chunk)
return digest.hexdigest()
def _vane_version() -> str:
"""Return the installed VANE version, or ``unknown`` if not installed."""
try:
return version("vane")
except PackageNotFoundError: # pragma: no cover - only in a non-installed tree
return "unknown"