Source code for vane.study

"""High-level study orchestration with reproducibility provenance.

A :class:`Study` discovers the linearization files of a campaign, runs the modal
pipeline, and records the **provenance** of the result — the exact source files (with
content hashes), the azimuth coverage of each operating point, the tuning thresholds,
and the library version — so a Campbell diagram can be tied back to precisely the
inputs and assumptions that produced it. ``write_bundle`` serializes a reproducibility
package (a JSON manifest plus the result tables).
"""

from __future__ import annotations

import hashlib
import json
from collections import defaultdict
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from importlib.metadata import PackageNotFoundError, version
from pathlib import Path
from typing import TYPE_CHECKING

from vane.campbell.excitation import DEFAULT_HARMONICS
from vane.export import campbell_table, write_table
from vane.io.lin_reader import read_lin_file
from vane.pipeline import ModalPipeline

if TYPE_CHECKING:
    from collections.abc import Sequence

    from vane.io.lin_reader import LinFile
    from vane.pipeline import PipelineResult

__all__ = [
    "OperatingPointProvenance",
    "Provenance",
    "SourceFile",
    "StudyResult",
    "discover_operating_points",
    "run_study",
]

_RPM_PARAMETER = "rotor_speed_rpm"
_DEG_PER_RAD = 180.0 / 3.141592653589793
_RPM_PER_RAD_S = 30.0 / 3.141592653589793
_HASH_CHUNK_BYTES = 65536


[docs] @dataclass(frozen=True) class SourceFile: """A source linearization file and the hash of its contents. Parameters ---------- path : str Path the file was read from. sha256 : str Hex-encoded SHA-256 digest of the file's bytes. """ path: str sha256: str
[docs] @dataclass(frozen=True) class OperatingPointProvenance: """Azimuth-sweep coverage of one operating point. Parameters ---------- n_azimuths : int Number of azimuth samples in the operating point. azimuth_min_deg, azimuth_max_deg : float Smallest and largest sampled azimuth, in degrees. rotor_speed_rpm : float Rotor speed of the operating point, in rev/min. """ n_azimuths: int azimuth_min_deg: float azimuth_max_deg: float rotor_speed_rpm: float
[docs] @dataclass(frozen=True) class Provenance: """A reproducibility record for a study run. Parameters ---------- vane_version : str Version of the library that produced the result. created_at : str ISO-8601 timestamp of when the provenance was recorded. parameter_name : str Operating parameter the sweep was run against. frequency_weight, mac_threshold : float Tracking tuning thresholds used. harmonics : tuple[int, ...] Excitation harmonics used for resonance detection. source_files : tuple[SourceFile, ...] Every input file with its content hash. operating_points : tuple[OperatingPointProvenance, ...] Per-operating-point azimuth coverage. n_tracks, n_resonances : int Number of identified mode tracks and detected resonance crossings. """ vane_version: str created_at: str parameter_name: str frequency_weight: float mac_threshold: float harmonics: tuple[int, ...] source_files: tuple[SourceFile, ...] operating_points: tuple[OperatingPointProvenance, ...] n_tracks: int n_resonances: int
[docs] def to_dict(self) -> dict[str, object]: """Return the provenance as a JSON-serializable dictionary.""" return asdict(self)
[docs] @dataclass class StudyResult: """The pipeline result of a study together with its provenance. Parameters ---------- pipeline : PipelineResult Every intermediate product of the analysis. provenance : Provenance The reproducibility record. """ pipeline: PipelineResult provenance: Provenance
[docs] def write_bundle(self, output_dir: str | Path) -> None: """Write a reproducibility bundle to ``output_dir``. The bundle contains ``provenance.json`` (the manifest of inputs and assumptions) and ``campbell.csv`` (the tracked-mode result table). Parameters ---------- output_dir : str or pathlib.Path Destination directory; created if it does not exist. """ destination = Path(output_dir) destination.mkdir(parents=True, exist_ok=True) manifest = json.dumps(self.provenance.to_dict(), indent=2) (destination / "provenance.json").write_text(manifest, encoding="utf-8") write_table( campbell_table(self.pipeline.campbell), destination / "campbell.csv" )
[docs] def discover_operating_points(directory: str | Path) -> list[list[LinFile]]: """Discover and group a directory's ``.lin`` files into operating points. Files are named ``<case>.<index>.lin`` by OpenFAST; those sharing a ``<case>`` root are one operating point's azimuth sweep. Parameters ---------- directory : str or pathlib.Path Directory containing ``.lin`` linearization files. Returns ------- list[list[LinFile]] One list of parsed linearization files per operating point, ordered by case. Raises ------ FileNotFoundError If the directory contains no ``.lin`` files. """ root_dir = Path(directory) groups: dict[str, list[Path]] = defaultdict(list) for path in sorted(root_dir.glob("*.lin")): root = path.name.rsplit(".", 2)[0] groups[root].append(path) if not groups: msg = f"No .lin files found in {directory}" raise FileNotFoundError(msg) return [[read_lin_file(path) for path in groups[root]] for root in sorted(groups)]
[docs] def run_study( operating_points: Sequence[Sequence[LinFile]], *, parameter_name: str = _RPM_PARAMETER, frequency_weight: float = 0.5, mac_threshold: float = 0.5, harmonics: Sequence[int] = DEFAULT_HARMONICS, timestamp: datetime | None = None, ) -> StudyResult: """Run the modal pipeline and record the provenance of the result. Parameters ---------- operating_points : Sequence[Sequence[LinFile]] One azimuth sweep per operating point. parameter_name : str, optional Operating parameter to run against. frequency_weight, mac_threshold : float, optional Tracking tuning thresholds. harmonics : Sequence[int], optional Excitation harmonics for resonance detection. timestamp : datetime or None, optional Timestamp to record (defaults to the current UTC time); injectable for reproducible tests. Returns ------- StudyResult The pipeline result and its provenance. """ # The pipeline sorts operating points by the operating parameter; sort the same # way up front so the provenance order matches the sorted Campbell result. ordered = sorted( operating_points, key=lambda point: _operating_parameter(point, parameter_name), ) pipeline = ModalPipeline( frequency_weight=frequency_weight, mac_threshold=mac_threshold, harmonics=harmonics, ) result = pipeline.run(ordered, parameter_name=parameter_name) provenance = _build_provenance( ordered, result, parameter_name=parameter_name, frequency_weight=frequency_weight, mac_threshold=mac_threshold, # The pipeline validates and normalizes the harmonics (e.g. float -> int); # record the normalized values, not the raw input. harmonics=pipeline.harmonics, timestamp=timestamp, ) return StudyResult(pipeline=result, provenance=provenance)
def _operating_parameter(point: Sequence[LinFile], parameter_name: str) -> float: """Return an operating point's parameter value, matching the pipeline's average.""" if not point: return 0.0 if parameter_name == _RPM_PARAMETER: mean_rotor_speed = sum(lin.rotor_speed for lin in point) / len(point) return mean_rotor_speed * _RPM_PER_RAD_S return sum(lin.wind_speed for lin in point) / len(point) def _build_provenance( operating_points: Sequence[Sequence[LinFile]], result: PipelineResult, *, parameter_name: str, frequency_weight: float, mac_threshold: float, harmonics: Sequence[int], timestamp: datetime | None, ) -> Provenance: """Assemble a :class:`Provenance` from the inputs and the pipeline result.""" # timezone.utc (not datetime.UTC) to stay compatible with Python 3.10. moment = timestamp if timestamp is not None else datetime.now(timezone.utc) # noqa: UP017 sources: list[SourceFile] = [] coverage: list[OperatingPointProvenance] = [] for point in operating_points: azimuths_deg = [lin.azimuth * _DEG_PER_RAD for lin in point] # Averaged rotor speed (rpm), matching the pipeline's azimuth average, rather # than an arbitrary single azimuth's value. mean_rpm = ( sum(lin.rotor_speed for lin in point) / len(point) * _RPM_PER_RAD_S if point else 0.0 ) coverage.append( OperatingPointProvenance( n_azimuths=len(point), azimuth_min_deg=min(azimuths_deg) if azimuths_deg else 0.0, azimuth_max_deg=max(azimuths_deg) if azimuths_deg else 0.0, rotor_speed_rpm=float(mean_rpm), ) ) sources.extend( SourceFile(path=str(lin.path), sha256=_file_sha256(lin.path)) for lin in point ) return Provenance( vane_version=_vane_version(), created_at=moment.isoformat(), parameter_name=parameter_name, frequency_weight=frequency_weight, mac_threshold=mac_threshold, harmonics=tuple(harmonics), source_files=tuple(sources), operating_points=tuple(coverage), n_tracks=len(result.tracks), n_resonances=len(result.resonances), ) def _file_sha256(path: Path) -> str: """Return the hex SHA-256 digest of a file's contents.""" digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(_HASH_CHUNK_BYTES), b""): digest.update(chunk) return digest.hexdigest() def _vane_version() -> str: """Return the installed VANE version, or ``unknown`` if not installed.""" try: return version("vane") except PackageNotFoundError: # pragma: no cover - only in a non-installed tree return "unknown"