"""High-level study orchestration with reproducibility provenance.
A study discovers the linearization files of a campaign, runs the modal pipeline, and
records the **provenance** of the result — the exact source files (with content
hashes) grouped by operating point, each point's case name, full azimuth list, and
operating-parameter value, the tuning thresholds, and the library and environment
versions — so a Campbell diagram can be tied back to precisely the inputs and
assumptions that produced it. ``write_bundle`` serializes a reproducibility package (a
JSON manifest plus the result tables).
"""
from __future__ import annotations
import hashlib
import json
import platform
from collections import defaultdict
from collections.abc import Sequence
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from importlib.metadata import PackageNotFoundError, version
from pathlib import Path
from typing import TYPE_CHECKING, overload
from vane.campbell.excitation import DEFAULT_HARMONICS
from vane.export import campbell_table, write_table
from vane.io.lin_reader import read_lin_file
from vane.pipeline import ModalPipeline
if TYPE_CHECKING:
from vane.io.lin_reader import LinFile
from vane.pipeline import PipelineResult
__all__ = [
"DiscoveredOperatingPoint",
"Environment",
"OperatingPointProvenance",
"Provenance",
"SourceFile",
"StudyResult",
"discover_operating_points",
"run_study",
]
_RPM_PARAMETER = "rotor_speed_rpm"
_WIND_PARAMETER = "wind_speed"
_DEG_PER_RAD = 180.0 / 3.141592653589793
_RPM_PER_RAD_S = 30.0 / 3.141592653589793
_HASH_CHUNK_BYTES = 65536
# Dependencies whose versions are recorded for reproducibility.
_RECORDED_DEPENDENCIES = (
"numpy",
"scipy",
"pandas",
"scikit-learn",
"matplotlib",
"plotly",
)
[docs]
@dataclass(frozen=True)
class SourceFile:
"""A source linearization file and the hash of its contents.
Parameters
----------
path : str
Path the file was read from.
sha256 : str
Hex-encoded SHA-256 digest of the file's bytes.
"""
path: str
sha256: str
[docs]
@dataclass(frozen=True)
class DiscoveredOperatingPoint(Sequence["LinFile"]):
"""A discovered operating point: its case name and its azimuth files.
Behaves as a sequence of its :class:`~vane.io.lin_reader.LinFile` objects, so it
can be passed wherever a sequence of linearization files is expected, while also
carrying the ``<case>`` grouping name that discovery would otherwise lose.
Parameters
----------
name : str
The ``<case>`` root the files were grouped under.
files : tuple[LinFile, ...]
The parsed azimuth files of this operating point.
"""
name: str
files: tuple[LinFile, ...]
@overload
def __getitem__(self, index: int) -> LinFile: ...
@overload
def __getitem__(self, index: slice) -> tuple[LinFile, ...]: ...
def __getitem__(self, index: int | slice) -> LinFile | tuple[LinFile, ...]:
"""Return the file(s) at ``index``."""
return self.files[index]
def __len__(self) -> int:
"""Return the number of azimuth files."""
return len(self.files)
[docs]
@dataclass(frozen=True)
class Environment:
"""The software environment a study ran in.
Parameters
----------
python_version : str
The CPython version (e.g. ``"3.11.9"``).
platform : str
The platform identification string.
dependencies : dict[str, str]
Version of each recorded scientific dependency.
"""
python_version: str
platform: str
dependencies: dict[str, str]
[docs]
@dataclass(frozen=True)
class OperatingPointProvenance:
"""Provenance of one operating point.
Parameters
----------
name : str
The ``<case>`` grouping name (empty if the point was not discovered by name).
n_azimuths : int
Number of azimuth samples in the operating point.
azimuths_deg : tuple[float, ...]
Every sampled azimuth, in degrees.
azimuth_min_deg, azimuth_max_deg : float
Smallest and largest sampled azimuth, in degrees.
rotor_speed_rpm : float
Azimuth-averaged rotor speed, in rev/min.
wind_speed : float
Azimuth-averaged hub-height wind speed, in m/s.
parameter_value : float
The operating-parameter value used to order the sweep.
source_files : tuple[SourceFile, ...]
The files of this operating point, each with its content hash.
"""
name: str
n_azimuths: int
azimuths_deg: tuple[float, ...]
azimuth_min_deg: float
azimuth_max_deg: float
rotor_speed_rpm: float
wind_speed: float
parameter_value: float
source_files: tuple[SourceFile, ...]
[docs]
@dataclass(frozen=True)
class Provenance:
"""A reproducibility record for a study run.
Parameters
----------
vane_version : str
Version of the library that produced the result.
created_at : str
ISO-8601 timestamp of when the provenance was recorded.
parameter_name : str
Operating parameter the sweep was run against.
frequency_weight, mac_threshold : float
Tracking tuning thresholds used.
harmonics : tuple[int, ...]
Excitation harmonics used for resonance detection.
environment : Environment
The Python, platform, and dependency versions.
operating_points : tuple[OperatingPointProvenance, ...]
Per-operating-point provenance, in the (sorted) order of the result.
n_tracks, n_resonances : int
Number of identified mode tracks and detected resonance crossings.
"""
vane_version: str
created_at: str
parameter_name: str
frequency_weight: float
mac_threshold: float
harmonics: tuple[int, ...]
environment: Environment
operating_points: tuple[OperatingPointProvenance, ...]
n_tracks: int
n_resonances: int
@property
def source_files(self) -> tuple[SourceFile, ...]:
"""Return every source file across all operating points (flattened)."""
return tuple(
source for point in self.operating_points for source in point.source_files
)
[docs]
def to_dict(self) -> dict[str, object]:
"""Return the provenance as a JSON-serializable dictionary."""
return asdict(self)
[docs]
@dataclass
class StudyResult:
"""The pipeline result of a study together with its provenance.
Parameters
----------
pipeline : PipelineResult
Every intermediate product of the analysis.
provenance : Provenance
The reproducibility record.
"""
pipeline: PipelineResult
provenance: Provenance
[docs]
def write_bundle(self, output_dir: str | Path) -> None:
"""Write a reproducibility bundle to ``output_dir``.
The bundle contains ``provenance.json`` (the manifest of inputs and
assumptions) and ``campbell.csv`` (the tracked-mode result table).
Parameters
----------
output_dir : str or pathlib.Path
Destination directory; created if it does not exist.
"""
destination = Path(output_dir)
destination.mkdir(parents=True, exist_ok=True)
manifest = json.dumps(self.provenance.to_dict(), indent=2)
(destination / "provenance.json").write_text(manifest, encoding="utf-8")
write_table(
campbell_table(self.pipeline.campbell), destination / "campbell.csv"
)
[docs]
def discover_operating_points(directory: str | Path) -> list[DiscoveredOperatingPoint]:
"""Discover and group a directory's ``.lin`` files into operating points.
Files are named ``<case>.<index>.lin`` by OpenFAST; those sharing a ``<case>``
root are one operating point's azimuth sweep, and the case name is retained on the
returned :class:`DiscoveredOperatingPoint`.
Parameters
----------
directory : str or pathlib.Path
Directory containing ``.lin`` linearization files.
Returns
-------
list[DiscoveredOperatingPoint]
One named operating point per ``<case>`` root, ordered by case.
Raises
------
FileNotFoundError
If the directory contains no ``.lin`` files.
"""
root_dir = Path(directory)
groups: dict[str, list[Path]] = defaultdict(list)
for path in sorted(root_dir.glob("*.lin")):
root = path.name.rsplit(".", 2)[0]
groups[root].append(path)
if not groups:
msg = f"No .lin files found in {directory}"
raise FileNotFoundError(msg)
return [
DiscoveredOperatingPoint(
name=root, files=tuple(read_lin_file(path) for path in groups[root])
)
for root in sorted(groups)
]
[docs]
def run_study(
operating_points: Sequence[Sequence[LinFile]],
*,
parameter_name: str = _RPM_PARAMETER,
frequency_weight: float = 0.5,
mac_threshold: float = 0.5,
harmonics: Sequence[int] = DEFAULT_HARMONICS,
timestamp: datetime | None = None,
) -> StudyResult:
"""Run the modal pipeline and record the provenance of the result.
Parameters
----------
operating_points : Sequence[Sequence[LinFile]]
One azimuth sweep per operating point (e.g. from
:func:`discover_operating_points`).
parameter_name : str, optional
Operating parameter to run against (``"rotor_speed_rpm"`` or
``"wind_speed"``).
frequency_weight, mac_threshold : float, optional
Tracking tuning thresholds.
harmonics : Sequence[int], optional
Excitation harmonics for resonance detection.
timestamp : datetime or None, optional
Timestamp to record (defaults to the current UTC time); injectable for
reproducible tests.
Returns
-------
StudyResult
The pipeline result and its provenance.
Raises
------
ValueError
If ``parameter_name`` is not a known operating parameter.
"""
if parameter_name not in (_RPM_PARAMETER, _WIND_PARAMETER):
msg = (
f"parameter_name must be '{_RPM_PARAMETER}' or '{_WIND_PARAMETER}', "
f"got '{parameter_name}'"
)
raise ValueError(msg)
# The pipeline sorts operating points by the operating parameter; sort the same
# way up front so the provenance order matches the sorted Campbell result.
ordered = sorted(
operating_points,
key=lambda point: _operating_parameter(point, parameter_name),
)
pipeline = ModalPipeline(
frequency_weight=frequency_weight,
mac_threshold=mac_threshold,
harmonics=harmonics,
)
result = pipeline.run(ordered, parameter_name=parameter_name)
provenance = _build_provenance(
ordered,
result,
parameter_name=parameter_name,
frequency_weight=frequency_weight,
mac_threshold=mac_threshold,
# The pipeline validates and normalizes the harmonics (e.g. float -> int);
# record the normalized values, not the raw input.
harmonics=pipeline.harmonics,
timestamp=timestamp,
)
return StudyResult(pipeline=result, provenance=provenance)
def _operating_parameter(point: Sequence[LinFile], parameter_name: str) -> float:
"""Return an operating point's parameter value, matching the pipeline's average."""
if not point:
return 0.0
if parameter_name == _RPM_PARAMETER:
mean_rotor_speed = sum(lin.rotor_speed for lin in point) / len(point)
return mean_rotor_speed * _RPM_PER_RAD_S
return sum(lin.wind_speed for lin in point) / len(point)
def _build_provenance(
operating_points: Sequence[Sequence[LinFile]],
result: PipelineResult,
*,
parameter_name: str,
frequency_weight: float,
mac_threshold: float,
harmonics: Sequence[int],
timestamp: datetime | None,
) -> Provenance:
"""Assemble a :class:`Provenance` from the inputs and the pipeline result."""
moment = timestamp if timestamp is not None else datetime.now(timezone.utc)
coverage: list[OperatingPointProvenance] = []
for point in operating_points:
files = list(point)
azimuths_deg = tuple(lin.azimuth * _DEG_PER_RAD for lin in files)
mean_rpm = (
sum(lin.rotor_speed for lin in files) / len(files) * _RPM_PER_RAD_S
if files
else 0.0
)
mean_wind = sum(lin.wind_speed for lin in files) / len(files) if files else 0.0
coverage.append(
OperatingPointProvenance(
name=getattr(point, "name", ""),
n_azimuths=len(files),
azimuths_deg=azimuths_deg,
azimuth_min_deg=min(azimuths_deg) if azimuths_deg else 0.0,
azimuth_max_deg=max(azimuths_deg) if azimuths_deg else 0.0,
rotor_speed_rpm=float(mean_rpm),
wind_speed=float(mean_wind),
parameter_value=_operating_parameter(files, parameter_name),
source_files=tuple(
SourceFile(path=str(lin.path), sha256=_file_sha256(lin.path))
for lin in files
),
)
)
return Provenance(
vane_version=_vane_version(),
created_at=moment.isoformat(),
parameter_name=parameter_name,
frequency_weight=frequency_weight,
mac_threshold=mac_threshold,
harmonics=tuple(harmonics),
environment=_environment(),
operating_points=tuple(coverage),
n_tracks=len(result.tracks),
n_resonances=len(result.resonances),
)
def _environment() -> Environment:
"""Capture the Python, platform, and dependency versions."""
dependencies: dict[str, str] = {}
for package in _RECORDED_DEPENDENCIES:
try:
dependencies[package] = version(package)
except PackageNotFoundError: # pragma: no cover - package always installed
dependencies[package] = "unknown"
return Environment(
python_version=platform.python_version(),
platform=platform.platform(),
dependencies=dependencies,
)
def _file_sha256(path: Path) -> str:
"""Return the hex SHA-256 digest of a file's contents."""
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(_HASH_CHUNK_BYTES), b""):
digest.update(chunk)
return digest.hexdigest()
def _vane_version() -> str:
"""Return the installed VANE version, or ``unknown`` if not installed."""
try:
return version("vane")
except PackageNotFoundError: # pragma: no cover - only in a non-installed tree
return "unknown"