"""Reader for OpenFAST linearization (``.lin``) files.
This module parses the *modern* linearization output produced by the OpenFAST
ModGlue engine into structured, in-memory objects. The modern format is
identified by the presence of a ``Derivative Order`` column in the
operating-point tables; the legacy format (which lacks that column and instead
emits ``StateRotation`` / mass-matrix blocks) is intentionally not supported and
raises :class:`LinFileFormatError`.
The on-disk layout consumed here is, in order:
1. A header block beginning at ``Simulation information:`` holding scalar
metadata (simulation time, rotor speed in rad/s, azimuth in rad, wind speed,
and channel counts) and terminated by ``Jacobians included in this file?``.
2. Operating-point tables (``Order of continuous states:`` and the analogous
blocks for state derivatives, inputs, and outputs). Each row carries an
index, an operating-point value, a rotating-frame flag, a derivative order,
and a free-text description.
3. Matrix blocks (``A``, ``B``, ``C``, ``D`` under ``Linearized state
matrices:`` and, when Jacobians are written, ``dUdu``/``dUdy``/``dXdy``/``J``
under ``Jacobian matrices:``).
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
from pathlib import Path
import numpy.typing as npt
__all__ = [
"LinFile",
"LinFileFormatError",
"OperatingPointTable",
"read_lin_file",
]
logger = logging.getLogger(__name__)
# Header markers and table labels used by the OpenFAST ModGlue writer.
_SIM_INFO_MARKER = "Simulation information:"
_JACOBIANS_MARKER = "Jacobians included in this file?"
_DERIV_ORDER_COLUMN = "Derivative Order"
# Maps an operating-point table label to the attribute name on :class:`LinFile`.
_OP_TABLE_LABELS: dict[str, str] = {
"Order of continuous states:": "x",
"Order of continuous state derivatives:": "xdot",
"Order of inputs": "u",
"Order of outputs": "y",
}
# Matrix header, e.g. ``A: 138 x 138``.
_MATRIX_HEADER_RE = re.compile(r"^([A-Za-z]+):\s+(\d+)\s+x\s+(\d+)\s*$")
# Module prefix at the start of a channel description, e.g. ``ED`` or ``BD_1``.
_MODULE_PREFIX_RE = re.compile(r"^([A-Za-z]+(?:_\d+)?)\s")
[docs]
@dataclass
class OperatingPointTable:
"""Operating-point metadata for one channel group of a ``.lin`` file.
A channel group is the set of continuous states, continuous-state
derivatives, inputs, or outputs. All array attributes are parallel and share
the channel ordering written in the file.
Parameters
----------
values : npt.NDArray[np.float64]
Operating-point value of each channel, shape ``(n_channels,)``. For
orientation channels that are written with multiple comma-separated
components, only the first component is retained here.
rotating_frame : npt.NDArray[np.bool_]
Per-channel flag, shape ``(n_channels,)``. ``True`` marks a channel
expressed in the blade-rotating frame (the hook used by the MBC3
transform).
derivative_order : npt.NDArray[np.int_]
Per-channel derivative order (0, 1, or 2), shape ``(n_channels,)``.
descriptions : list[str]
Per-channel free-text description, length ``n_channels``. Each begins
with the originating module prefix (e.g. ``"ED ..."``).
"""
values: npt.NDArray[np.float64]
rotating_frame: npt.NDArray[np.bool_]
derivative_order: npt.NDArray[np.int_]
descriptions: list[str]
def __len__(self) -> int:
"""Return the number of channels in the table."""
return int(self.values.shape[0])
@property
def modules(self) -> list[str]:
"""Return the originating module prefix of each channel.
Returns
-------
list[str]
The leading module token of every description (e.g. ``"ED"`` or
``"BD_1"``); an empty string is used when no prefix is present.
"""
out: list[str] = []
for desc in self.descriptions:
match = _MODULE_PREFIX_RE.match(desc)
out.append(match.group(1) if match else "")
return out
[docs]
@dataclass
class LinFile:
"""Parsed contents of a single OpenFAST ``.lin`` linearization file.
Parameters
----------
path : pathlib.Path
Source file the data was read from.
sim_time : float
Simulation time of the linearization, in seconds.
rotor_speed : float
Rotor speed at the operating point, in rad/s (as written by OpenFAST).
azimuth : float
Rotor azimuth at the operating point, in rad.
wind_speed : float
Mean hub-height wind speed, in m/s. Zero when InflowWind is absent.
jacobians_included : bool
Whether the file contains the ``Jacobian matrices`` block.
x, xdot, u, y : OperatingPointTable
Operating-point tables for the continuous states, continuous-state
derivatives, inputs, and outputs, respectively.
a, b, c, d : npt.NDArray[np.float64] or None
State-space matrices :math:`A` (n_x, n_x), :math:`B` (n_x, n_u),
:math:`C` (n_y, n_x), and :math:`D` (n_y, n_u). ``None`` when the
corresponding block is absent (e.g. ``B``/``D`` with zero inputs).
jacobians : dict[str, npt.NDArray[np.float64]]
Any additional matrices from the ``Jacobian matrices`` block, keyed by
their label (e.g. ``"dUdu"``, ``"dUdy"``, ``"dXdy"``).
"""
path: Path
sim_time: float
rotor_speed: float
azimuth: float
wind_speed: float
jacobians_included: bool
x: OperatingPointTable
xdot: OperatingPointTable
u: OperatingPointTable
y: OperatingPointTable
a: npt.NDArray[np.float64] | None
b: npt.NDArray[np.float64] | None
c: npt.NDArray[np.float64] | None
d: npt.NDArray[np.float64] | None
jacobians: dict[str, npt.NDArray[np.float64]]
@property
def n_x(self) -> int:
"""Return the number of continuous states."""
return len(self.x)
@property
def n_u(self) -> int:
"""Return the number of inputs."""
return len(self.u)
@property
def n_y(self) -> int:
"""Return the number of outputs."""
return len(self.y)
[docs]
def read_lin_file(path: Path) -> LinFile:
"""Read and parse a modern OpenFAST ``.lin`` file.
Parameters
----------
path : pathlib.Path
Path to the ``.lin`` file to read.
Returns
-------
LinFile
The parsed linearization data.
Raises
------
LinFileFormatError
If the file is empty, truncated, in the unsupported legacy format, or
otherwise does not conform to the modern ModGlue layout.
Notes
-----
Numeric fields containing Fortran overflow markers (runs of ``*``) are
parsed as :data:`numpy.nan` and a warning is logged; the parse does not
fail. Empty matrix blocks are not written by OpenFAST, so the corresponding
:class:`LinFile` matrix attribute is ``None``.
"""
lines = path.read_text(encoding="utf-8").splitlines()
if not lines:
msg = f"Linearization file is empty: {path}"
raise LinFileFormatError(msg)
cursor = _Cursor(lines)
scalars, jacobians_included = _parse_header(cursor, path)
tables = _parse_operating_point_tables(cursor, path)
matrices = _parse_matrices(cursor, path)
lin = LinFile(
path=path,
sim_time=scalars["sim_time"],
rotor_speed=scalars["rotor_speed"],
azimuth=scalars["azimuth"],
wind_speed=scalars["wind_speed"],
jacobians_included=jacobians_included,
x=tables["x"],
xdot=tables["xdot"],
u=tables["u"],
y=tables["y"],
a=matrices.pop("A", None),
b=matrices.pop("B", None),
c=matrices.pop("C", None),
d=matrices.pop("D", None),
jacobians=matrices,
)
_validate_lin_file(lin)
return lin
def _validate_lin_file(lin: LinFile) -> None:
"""Validate operating metadata and state-space matrix shapes after parsing.
Overflow markers in matrix *data* are tolerated as ``nan`` (see
:func:`read_lin_file`), but a non-finite operating-point scalar denotes a
corrupt linearization, and a state-space matrix whose shape disagrees with the
channel-table dimensions cannot be used downstream. Both fail loudly here rather
than surfacing as an opaque error deep in the analysis.
Parameters
----------
lin : LinFile
The freshly parsed linearization file.
Raises
------
LinFileFormatError
If an operating-point scalar is non-finite, or a present ``A``/``B``/``C``/
``D`` matrix does not match ``(n_x, n_x)``/``(n_x, n_u)``/``(n_y, n_x)``/
``(n_y, n_u)``.
"""
for field_name, value in (
("rotor speed", lin.rotor_speed),
("azimuth", lin.azimuth),
("wind speed", lin.wind_speed),
("simulation time", lin.sim_time),
):
if not np.isfinite(value):
msg = f"{lin.path}: non-finite {field_name} ({value})"
raise LinFileFormatError(msg)
expected = {
"A": (lin.a, (lin.n_x, lin.n_x)),
"B": (lin.b, (lin.n_x, lin.n_u)),
"C": (lin.c, (lin.n_y, lin.n_x)),
"D": (lin.d, (lin.n_y, lin.n_u)),
}
for name, (matrix, shape) in expected.items():
if matrix is not None and matrix.shape != shape:
msg = (
f"{lin.path}: matrix {name} has shape {matrix.shape}, "
f"expected {shape} from the channel tables"
)
raise LinFileFormatError(msg)
class _Cursor:
"""A forward-only line cursor over the contents of a ``.lin`` file."""
def __init__(self, lines: list[str]) -> None:
self._lines = lines
self._index = 0
@property
def exhausted(self) -> bool:
"""Return whether every line has been consumed."""
return self._index >= len(self._lines)
def peek(self) -> str | None:
"""Return the current line without advancing, or ``None`` at the end."""
if self.exhausted:
return None
return self._lines[self._index]
def next(self) -> str:
"""Return the current line and advance the cursor."""
line = self._lines[self._index]
self._index += 1
return line
def advance_to(self, marker: str) -> bool:
"""Advance until a line containing ``marker``; consume that line.
Parameters
----------
marker : str
Substring to search for.
Returns
-------
bool
``True`` if the marker was found (cursor now sits just past it),
``False`` if the end of file was reached first.
"""
while not self.exhausted:
if marker in self.next():
return True
return False
def _parse_float(token: str) -> float:
"""Parse a Fortran-formatted float, mapping overflow markers to NaN.
Parameters
----------
token : str
A single whitespace-delimited numeric token.
Returns
-------
float
The parsed value, or :data:`numpy.nan` if the token is a Fortran
overflow marker (contains ``*``).
"""
if "*" in token:
logger.warning("Overflow marker %r parsed as NaN", token)
return float("nan")
return float(token)
def _normalize_scalar(key: str, value: float, unit: str) -> float:
"""Normalize a header scalar to rad/s (rotor speed) and rad (azimuth).
OpenFAST writes rotor speed in rad/s and azimuth in rad, but its documentation
describes rpm/deg. If a file's unit token reports those alternative units, the
value is converted so downstream code always receives rad/s and rad.
Parameters
----------
key : str
The scalar's attribute name (e.g. ``"rotor_speed"``).
value : float
The parsed numeric value.
unit : str
The unit token that followed the value in the header, if any.
Returns
-------
float
The value in internal units.
"""
unit_lower = unit.lower()
if key == "rotor_speed" and unit_lower == "rpm":
return value * float(np.pi) / 30.0
if key == "azimuth" and unit_lower.startswith("deg"):
return value * float(np.pi) / 180.0
return value
def _parse_header(cursor: _Cursor, path: Path) -> tuple[dict[str, float], bool]:
"""Parse the ``Simulation information`` header block.
Parameters
----------
cursor : _Cursor
Cursor positioned anywhere before the header.
path : pathlib.Path
Source path, used only for error messages.
Returns
-------
tuple[dict[str, float], bool]
A mapping with keys ``sim_time``, ``rotor_speed``, ``azimuth``, and
``wind_speed``, plus a flag indicating whether Jacobians are included.
Raises
------
LinFileFormatError
If the header marker or a required field is missing.
"""
if not cursor.advance_to(_SIM_INFO_MARKER):
msg = f"Missing {_SIM_INFO_MARKER!r} header in {path}"
raise LinFileFormatError(msg)
scalars: dict[str, float] = {}
# Both the space-separated header label and the underscore form used in the
# OpenFAST documentation are accepted for rotor speed.
field_map = {
"Simulation time:": "sim_time",
"Rotor Speed:": "rotor_speed",
"Rotor_Speed:": "rotor_speed",
"Azimuth:": "azimuth",
"Wind Speed:": "wind_speed",
}
jacobians_included = False
while not cursor.exhausted:
line = cursor.next()
for label, key in field_map.items():
if label in line:
fields = line.split(label)[1].split()
unit = fields[1] if len(fields) > 1 else ""
scalars[key] = _normalize_scalar(key, _parse_float(fields[0]), unit)
break
if _JACOBIANS_MARKER in line:
jacobians_included = line.rsplit(maxsplit=1)[-1].strip().lower() == "yes"
break
missing = {"sim_time", "rotor_speed", "azimuth", "wind_speed"} - scalars.keys()
if missing:
msg = f"Header in {path} is missing fields: {sorted(missing)}"
raise LinFileFormatError(msg)
return scalars, jacobians_included
def _parse_operating_point_tables(
cursor: _Cursor, path: Path
) -> dict[str, OperatingPointTable]:
"""Parse the four operating-point tables that follow the header.
Parameters
----------
cursor : _Cursor
Cursor positioned just after the header block.
path : pathlib.Path
Source path, used only for error messages.
Returns
-------
dict[str, OperatingPointTable]
Tables keyed by ``"x"``, ``"xdot"``, ``"u"``, and ``"y"``. Missing
tables (e.g. no inputs) yield empty tables.
Raises
------
LinFileFormatError
If a table is present but in the unsupported legacy format.
"""
tables: dict[str, OperatingPointTable] = {
key: _empty_table() for key in ("x", "xdot", "u", "y")
}
while not cursor.exhausted:
line = cursor.peek()
if line is None:
break
label = _match_table_label(line)
if label is None:
if "Linearized state matrices:" in line or "Jacobian matrices:" in line:
break
cursor.next()
continue
cursor.next() # consume the label line
tables[_OP_TABLE_LABELS[label]] = _parse_one_table(cursor, path)
return tables
def _match_table_label(line: str) -> str | None:
"""Return the operating-point table label contained in ``line``, if any."""
for label in _OP_TABLE_LABELS:
if line.startswith(label):
return label
return None
def _parse_one_table(cursor: _Cursor, path: Path) -> OperatingPointTable:
"""Parse a single operating-point table starting at its column header.
Parameters
----------
cursor : _Cursor
Cursor positioned on the column-header line of the table.
path : pathlib.Path
Source path, used only for error messages.
Returns
-------
OperatingPointTable
The parsed table; empty when no data rows are present.
Raises
------
LinFileFormatError
If the table lacks the modern ``Derivative Order`` column.
"""
header = cursor.next()
if _DERIV_ORDER_COLUMN not in header:
msg = (
f"{path} uses the unsupported legacy .lin format "
f"(no {_DERIV_ORDER_COLUMN!r} column). Only the modern ModGlue "
f"format is supported."
)
raise LinFileFormatError(msg)
if not cursor.exhausted:
cursor.next() # consume the dashed separator line
values: list[float] = []
rotating: list[bool] = []
deriv_order: list[int] = []
descriptions: list[str] = []
while not cursor.exhausted:
line = cursor.peek()
if line is None or not line.strip():
break
cursor.next()
value, is_rot, order, desc = _parse_table_row(line)
values.append(value)
rotating.append(is_rot)
deriv_order.append(order)
descriptions.append(desc)
return OperatingPointTable(
values=np.asarray(values, dtype=np.float64),
rotating_frame=np.asarray(rotating, dtype=np.bool_),
derivative_order=np.asarray(deriv_order, dtype=np.int_),
descriptions=descriptions,
)
def _parse_table_row(line: str) -> tuple[float, bool, int, str]:
"""Parse one operating-point table row.
The row layout is ``index value[, value, value] {T|F} order description``,
where orientation channels carry three comma-separated operating-point
values (only the first is returned).
Parameters
----------
line : str
A single data row from an operating-point table.
Returns
-------
tuple[float, bool, int, str]
The first operating-point value, the rotating-frame flag, the
derivative order, and the description.
Raises
------
LinFileFormatError
If the rotating-frame column cannot be located in the row.
"""
tokens = line.split()
rot_idx = next((i for i, tok in enumerate(tokens) if tok in ("T", "F")), -1)
if rot_idx < 1:
msg = f"Could not locate rotating-frame flag in row: {line!r}"
raise LinFileFormatError(msg)
value = _parse_float(tokens[1].rstrip(","))
is_rot = tokens[rot_idx] == "T"
order = int(tokens[rot_idx + 1])
description = " ".join(tokens[rot_idx + 2 :])
return value, is_rot, order, description
def _parse_matrices(cursor: _Cursor, path: Path) -> dict[str, npt.NDArray[np.float64]]:
"""Parse all matrix blocks remaining in the file.
Parameters
----------
cursor : _Cursor
Cursor positioned after the operating-point tables.
path : pathlib.Path
Source path, used only for error messages.
Returns
-------
dict[str, npt.NDArray[np.float64]]
Matrices keyed by their label (``"A"``, ``"B"``, ``"dUdu"``, ...).
Raises
------
LinFileFormatError
If a declared matrix has fewer data rows than its header specifies.
"""
matrices: dict[str, npt.NDArray[np.float64]] = {}
while not cursor.exhausted:
line = cursor.next()
match = _MATRIX_HEADER_RE.match(line)
if match is None:
continue
name, n_rows, n_cols = match.group(1), int(match.group(2)), int(match.group(3))
matrices[name] = _parse_matrix_body(cursor, name, n_rows, n_cols, path)
return matrices
def _parse_matrix_body(
cursor: _Cursor, name: str, n_rows: int, n_cols: int, path: Path
) -> npt.NDArray[np.float64]:
"""Read ``n_rows`` numeric rows of a matrix into an array.
Parameters
----------
cursor : _Cursor
Cursor positioned just after the matrix header line.
name : str
Matrix label, used only for error messages.
n_rows, n_cols : int
Expected matrix dimensions from the header.
path : pathlib.Path
Source path, used only for error messages.
Returns
-------
npt.NDArray[np.float64]
The parsed matrix of shape ``(n_rows, n_cols)``.
Raises
------
LinFileFormatError
If the block is truncated or a row has the wrong number of columns.
"""
matrix = np.empty((n_rows, n_cols), dtype=np.float64)
for row in range(n_rows):
if cursor.exhausted:
msg = f"Matrix {name!r} in {path} is truncated at row {row}"
raise LinFileFormatError(msg)
tokens = cursor.next().split()
if len(tokens) != n_cols:
msg = (
f"Matrix {name!r} in {path}: row {row} has {len(tokens)} values, "
f"expected {n_cols}"
)
raise LinFileFormatError(msg)
matrix[row] = [_parse_float(tok) for tok in tokens]
return matrix
def _empty_table() -> OperatingPointTable:
"""Return an :class:`OperatingPointTable` with no channels."""
return OperatingPointTable(
values=np.empty(0, dtype=np.float64),
rotating_frame=np.empty(0, dtype=np.bool_),
derivative_order=np.empty(0, dtype=np.int_),
descriptions=[],
)