Source code for vane.study

"""High-level study orchestration with reproducibility provenance.

A study discovers the linearization files of a campaign, runs the modal pipeline, and
records the **provenance** of the result — the exact source files (with content
hashes) grouped by operating point, each point's case name, full azimuth list, and
operating-parameter value, the tuning thresholds, and the library and environment
versions — so a Campbell diagram can be tied back to precisely the inputs and
assumptions that produced it. ``write_bundle`` serializes a reproducibility package (a
JSON manifest plus the result tables).
"""

from __future__ import annotations

import hashlib
import json
import platform
from collections import defaultdict
from collections.abc import Sequence
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from importlib.metadata import PackageNotFoundError, version
from pathlib import Path
from typing import TYPE_CHECKING, overload

from vane.campbell.excitation import DEFAULT_HARMONICS
from vane.export import campbell_table, write_table
from vane.io.lin_reader import read_lin_file
from vane.pipeline import ModalPipeline

if TYPE_CHECKING:
    from vane.io.lin_reader import LinFile
    from vane.pipeline import PipelineResult

__all__ = [
    "DiscoveredOperatingPoint",
    "Environment",
    "OperatingPointProvenance",
    "Provenance",
    "SourceFile",
    "StudyResult",
    "discover_operating_points",
    "run_study",
]

_RPM_PARAMETER = "rotor_speed_rpm"
_WIND_PARAMETER = "wind_speed"
_DEG_PER_RAD = 180.0 / 3.141592653589793
_RPM_PER_RAD_S = 30.0 / 3.141592653589793
_HASH_CHUNK_BYTES = 65536
# Dependencies whose versions are recorded for reproducibility.
_RECORDED_DEPENDENCIES = (
    "numpy",
    "scipy",
    "pandas",
    "scikit-learn",
    "matplotlib",
    "plotly",
)


[docs] @dataclass(frozen=True) class SourceFile: """A source linearization file and the hash of its contents. Parameters ---------- path : str Path the file was read from. sha256 : str Hex-encoded SHA-256 digest of the file's bytes. """ path: str sha256: str
[docs] @dataclass(frozen=True) class DiscoveredOperatingPoint(Sequence["LinFile"]): """A discovered operating point: its case name and its azimuth files. Behaves as a sequence of its :class:`~vane.io.lin_reader.LinFile` objects, so it can be passed wherever a sequence of linearization files is expected, while also carrying the ``<case>`` grouping name that discovery would otherwise lose. Parameters ---------- name : str The ``<case>`` root the files were grouped under. files : tuple[LinFile, ...] The parsed azimuth files of this operating point. """ name: str files: tuple[LinFile, ...] @overload def __getitem__(self, index: int) -> LinFile: ... @overload def __getitem__(self, index: slice) -> tuple[LinFile, ...]: ... def __getitem__(self, index: int | slice) -> LinFile | tuple[LinFile, ...]: """Return the file(s) at ``index``.""" return self.files[index] def __len__(self) -> int: """Return the number of azimuth files.""" return len(self.files)
[docs] @dataclass(frozen=True) class Environment: """The software environment a study ran in. Parameters ---------- python_version : str The CPython version (e.g. ``"3.11.9"``). platform : str The platform identification string. dependencies : dict[str, str] Version of each recorded scientific dependency. """ python_version: str platform: str dependencies: dict[str, str]
[docs] @dataclass(frozen=True) class OperatingPointProvenance: """Provenance of one operating point. Parameters ---------- name : str The ``<case>`` grouping name (empty if the point was not discovered by name). n_azimuths : int Number of azimuth samples in the operating point. azimuths_deg : tuple[float, ...] Every sampled azimuth, in degrees. azimuth_min_deg, azimuth_max_deg : float Smallest and largest sampled azimuth, in degrees. rotor_speed_rpm : float Azimuth-averaged rotor speed, in rev/min. wind_speed : float Azimuth-averaged hub-height wind speed, in m/s. parameter_value : float The operating-parameter value used to order the sweep. source_files : tuple[SourceFile, ...] The files of this operating point, each with its content hash. """ name: str n_azimuths: int azimuths_deg: tuple[float, ...] azimuth_min_deg: float azimuth_max_deg: float rotor_speed_rpm: float wind_speed: float parameter_value: float source_files: tuple[SourceFile, ...]
[docs] @dataclass(frozen=True) class Provenance: """A reproducibility record for a study run. Parameters ---------- vane_version : str Version of the library that produced the result. created_at : str ISO-8601 timestamp of when the provenance was recorded. parameter_name : str Operating parameter the sweep was run against. frequency_weight, mac_threshold : float Tracking tuning thresholds used. harmonics : tuple[int, ...] Excitation harmonics used for resonance detection. environment : Environment The Python, platform, and dependency versions. operating_points : tuple[OperatingPointProvenance, ...] Per-operating-point provenance, in the (sorted) order of the result. n_tracks, n_resonances : int Number of identified mode tracks and detected resonance crossings. """ vane_version: str created_at: str parameter_name: str frequency_weight: float mac_threshold: float harmonics: tuple[int, ...] environment: Environment operating_points: tuple[OperatingPointProvenance, ...] n_tracks: int n_resonances: int @property def source_files(self) -> tuple[SourceFile, ...]: """Return every source file across all operating points (flattened).""" return tuple( source for point in self.operating_points for source in point.source_files )
[docs] def to_dict(self) -> dict[str, object]: """Return the provenance as a JSON-serializable dictionary.""" return asdict(self)
[docs] @dataclass class StudyResult: """The pipeline result of a study together with its provenance. Parameters ---------- pipeline : PipelineResult Every intermediate product of the analysis. provenance : Provenance The reproducibility record. """ pipeline: PipelineResult provenance: Provenance
[docs] def write_bundle(self, output_dir: str | Path) -> None: """Write a reproducibility bundle to ``output_dir``. The bundle contains ``provenance.json`` (the manifest of inputs and assumptions) and ``campbell.csv`` (the tracked-mode result table). Parameters ---------- output_dir : str or pathlib.Path Destination directory; created if it does not exist. """ destination = Path(output_dir) destination.mkdir(parents=True, exist_ok=True) manifest = json.dumps(self.provenance.to_dict(), indent=2) (destination / "provenance.json").write_text(manifest, encoding="utf-8") write_table( campbell_table(self.pipeline.campbell), destination / "campbell.csv" )
[docs] def discover_operating_points(directory: str | Path) -> list[DiscoveredOperatingPoint]: """Discover and group a directory's ``.lin`` files into operating points. Files are named ``<case>.<index>.lin`` by OpenFAST; those sharing a ``<case>`` root are one operating point's azimuth sweep, and the case name is retained on the returned :class:`DiscoveredOperatingPoint`. Parameters ---------- directory : str or pathlib.Path Directory containing ``.lin`` linearization files. Returns ------- list[DiscoveredOperatingPoint] One named operating point per ``<case>`` root, ordered by case. Raises ------ FileNotFoundError If the directory contains no ``.lin`` files. """ root_dir = Path(directory) groups: dict[str, list[Path]] = defaultdict(list) for path in sorted(root_dir.glob("*.lin")): root = path.name.rsplit(".", 2)[0] groups[root].append(path) if not groups: msg = f"No .lin files found in {directory}" raise FileNotFoundError(msg) return [ DiscoveredOperatingPoint( name=root, files=tuple(read_lin_file(path) for path in groups[root]) ) for root in sorted(groups) ]
[docs] def run_study( operating_points: Sequence[Sequence[LinFile]], *, parameter_name: str = _RPM_PARAMETER, frequency_weight: float = 0.5, mac_threshold: float = 0.5, harmonics: Sequence[int] = DEFAULT_HARMONICS, timestamp: datetime | None = None, ) -> StudyResult: """Run the modal pipeline and record the provenance of the result. Parameters ---------- operating_points : Sequence[Sequence[LinFile]] One azimuth sweep per operating point (e.g. from :func:`discover_operating_points`). parameter_name : str, optional Operating parameter to run against (``"rotor_speed_rpm"`` or ``"wind_speed"``). frequency_weight, mac_threshold : float, optional Tracking tuning thresholds. harmonics : Sequence[int], optional Excitation harmonics for resonance detection. timestamp : datetime or None, optional Timestamp to record (defaults to the current UTC time); injectable for reproducible tests. Returns ------- StudyResult The pipeline result and its provenance. Raises ------ ValueError If ``parameter_name`` is not a known operating parameter. """ if parameter_name not in (_RPM_PARAMETER, _WIND_PARAMETER): msg = ( f"parameter_name must be '{_RPM_PARAMETER}' or '{_WIND_PARAMETER}', " f"got '{parameter_name}'" ) raise ValueError(msg) # The pipeline sorts operating points by the operating parameter; sort the same # way up front so the provenance order matches the sorted Campbell result. ordered = sorted( operating_points, key=lambda point: _operating_parameter(point, parameter_name), ) pipeline = ModalPipeline( frequency_weight=frequency_weight, mac_threshold=mac_threshold, harmonics=harmonics, ) result = pipeline.run(ordered, parameter_name=parameter_name) provenance = _build_provenance( ordered, result, parameter_name=parameter_name, frequency_weight=frequency_weight, mac_threshold=mac_threshold, # The pipeline validates and normalizes the harmonics (e.g. float -> int); # record the normalized values, not the raw input. harmonics=pipeline.harmonics, timestamp=timestamp, ) return StudyResult(pipeline=result, provenance=provenance)
def _operating_parameter(point: Sequence[LinFile], parameter_name: str) -> float: """Return an operating point's parameter value, matching the pipeline's average.""" if not point: return 0.0 if parameter_name == _RPM_PARAMETER: mean_rotor_speed = sum(lin.rotor_speed for lin in point) / len(point) return mean_rotor_speed * _RPM_PER_RAD_S return sum(lin.wind_speed for lin in point) / len(point) def _build_provenance( operating_points: Sequence[Sequence[LinFile]], result: PipelineResult, *, parameter_name: str, frequency_weight: float, mac_threshold: float, harmonics: Sequence[int], timestamp: datetime | None, ) -> Provenance: """Assemble a :class:`Provenance` from the inputs and the pipeline result.""" moment = timestamp if timestamp is not None else datetime.now(timezone.utc) coverage: list[OperatingPointProvenance] = [] for point in operating_points: files = list(point) azimuths_deg = tuple(lin.azimuth * _DEG_PER_RAD for lin in files) mean_rpm = ( sum(lin.rotor_speed for lin in files) / len(files) * _RPM_PER_RAD_S if files else 0.0 ) mean_wind = sum(lin.wind_speed for lin in files) / len(files) if files else 0.0 coverage.append( OperatingPointProvenance( name=getattr(point, "name", ""), n_azimuths=len(files), azimuths_deg=azimuths_deg, azimuth_min_deg=min(azimuths_deg) if azimuths_deg else 0.0, azimuth_max_deg=max(azimuths_deg) if azimuths_deg else 0.0, rotor_speed_rpm=float(mean_rpm), wind_speed=float(mean_wind), parameter_value=_operating_parameter(files, parameter_name), source_files=tuple( SourceFile(path=str(lin.path), sha256=_file_sha256(lin.path)) for lin in files ), ) ) return Provenance( vane_version=_vane_version(), created_at=moment.isoformat(), parameter_name=parameter_name, frequency_weight=frequency_weight, mac_threshold=mac_threshold, harmonics=tuple(harmonics), environment=_environment(), operating_points=tuple(coverage), n_tracks=len(result.tracks), n_resonances=len(result.resonances), ) def _environment() -> Environment: """Capture the Python, platform, and dependency versions.""" dependencies: dict[str, str] = {} for package in _RECORDED_DEPENDENCIES: try: dependencies[package] = version(package) except PackageNotFoundError: # pragma: no cover - package always installed dependencies[package] = "unknown" return Environment( python_version=platform.python_version(), platform=platform.platform(), dependencies=dependencies, ) def _file_sha256(path: Path) -> str: """Return the hex SHA-256 digest of a file's contents.""" digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(_HASH_CHUNK_BYTES), b""): digest.update(chunk) return digest.hexdigest() def _vane_version() -> str: """Return the installed VANE version, or ``unknown`` if not installed.""" try: return version("vane") except PackageNotFoundError: # pragma: no cover - only in a non-installed tree return "unknown"