ics-simlab-config-gen-claude/models/ics_simlab_config_v2.py

391 lines
12 KiB
Python

"""
Complete Pydantic v2 models for ICS-SimLab configuration.
This module provides comprehensive validation and normalization of configuration.json files.
It handles type inconsistencies found in real configs (port/slave_id as string vs int).
Key Features:
- Safe type coercion: only coerce strictly numeric strings (^[0-9]+$)
- Logging when coercion happens
- --strict mode support (disable coercion, fail on type mismatch)
- Discriminated unions for connection types (tcp vs rtu)
"""
from __future__ import annotations
import logging
import re
from typing import Annotated, Any, List, Literal, Optional, Union
from pydantic import (
BaseModel,
ConfigDict,
Field,
BeforeValidator,
model_validator,
)
logger = logging.getLogger(__name__)
# Global strict mode flag - when True, coercion is disabled
_STRICT_MODE = False
def set_strict_mode(strict: bool) -> None:
"""Enable or disable strict mode globally."""
global _STRICT_MODE
_STRICT_MODE = strict
if strict:
logger.info("Strict mode enabled: type coercion disabled")
def is_strict_mode() -> bool:
"""Check if strict mode is enabled."""
return _STRICT_MODE
# Regex for strictly numeric strings
_NUMERIC_RE = re.compile(r"^[0-9]+$")
def _safe_coerce_to_int(v: Any, field_name: str = "field") -> int:
"""
Safely coerce value to int.
- If already int, return as-is
- If string matching ^[0-9]+$, coerce and log
- Otherwise, raise ValueError
In strict mode, only accept int.
"""
if isinstance(v, int) and not isinstance(v, bool):
return v
if isinstance(v, str):
if _STRICT_MODE:
raise ValueError(
f"{field_name}: string '{v}' not allowed in strict mode, expected int"
)
if _NUMERIC_RE.match(v):
coerced = int(v)
logger.warning(
f"Type coercion: {field_name} '{v}' (str) -> {coerced} (int)"
)
return coerced
raise ValueError(
f"{field_name}: cannot coerce '{v}' to int (not strictly numeric)"
)
if isinstance(v, float):
if v.is_integer():
return int(v)
raise ValueError(f"{field_name}: cannot coerce float {v} to int (has decimal)")
raise ValueError(f"{field_name}: expected int, got {type(v).__name__}")
def _make_int_coercer(field_name: str):
"""Factory to create a coercer with field name for logging."""
def coercer(v: Any) -> int:
return _safe_coerce_to_int(v, field_name)
return coercer
# Type aliases with safe coercion
PortInt = Annotated[int, BeforeValidator(_make_int_coercer("port"))]
SlaveIdInt = Annotated[int, BeforeValidator(_make_int_coercer("slave_id"))]
AddressInt = Annotated[int, BeforeValidator(_make_int_coercer("address"))]
CountInt = Annotated[int, BeforeValidator(_make_int_coercer("count"))]
# ============================================================================
# Network Configuration
# ============================================================================
class NetworkConfig(BaseModel):
"""Network configuration for a device."""
model_config = ConfigDict(extra="forbid")
ip: str
port: Optional[PortInt] = None
docker_network: Optional[str] = None
class UIConfig(BaseModel):
"""UI service configuration."""
model_config = ConfigDict(extra="forbid")
network: NetworkConfig
# ============================================================================
# Connection Types (Discriminated Union)
# ============================================================================
class TCPConnection(BaseModel):
"""TCP/IP connection configuration."""
model_config = ConfigDict(extra="forbid")
type: Literal["tcp"]
ip: str
port: PortInt
id: Optional[str] = None # Required for outbound, optional for inbound
class RTUConnection(BaseModel):
"""Modbus RTU (serial) connection configuration."""
model_config = ConfigDict(extra="forbid")
type: Literal["rtu"]
comm_port: str
slave_id: Optional[SlaveIdInt] = None
id: Optional[str] = None
Connection = Annotated[
Union[TCPConnection, RTUConnection],
Field(discriminator="type")
]
# ============================================================================
# Register Definitions
# ============================================================================
class BaseRegister(BaseModel):
"""
Register definition used in PLCs, sensors, actuators, and HMIs.
Fields vary by device type:
- PLC registers: have 'id' and 'io'
- Sensor/actuator registers: have 'physical_value'
- HMI registers: have 'id' but no 'io'
"""
model_config = ConfigDict(extra="forbid")
address: AddressInt
count: CountInt = 1
id: Optional[str] = None
io: Optional[Literal["input", "output"]] = None
physical_value: Optional[str] = None
physical_values: Optional[List[str]] = None # Rare, seen in some actuators
class RegisterBlock(BaseModel):
"""Collection of registers organized by Modbus type."""
model_config = ConfigDict(extra="forbid")
coil: List[BaseRegister] = Field(default_factory=list)
discrete_input: List[BaseRegister] = Field(default_factory=list)
holding_register: List[BaseRegister] = Field(default_factory=list)
input_register: List[BaseRegister] = Field(default_factory=list)
# ============================================================================
# Monitor / Controller Definitions
# ============================================================================
class Monitor(BaseModel):
"""
Monitor definition for polling remote registers.
Used by PLCs and HMIs to read values from remote devices.
"""
model_config = ConfigDict(extra="forbid")
outbound_connection_id: str
id: str
value_type: Literal["coil", "discrete_input", "holding_register", "input_register"]
slave_id: SlaveIdInt = 1
address: AddressInt
count: CountInt = 1
interval: float
class Controller(BaseModel):
"""
Controller definition for writing to remote registers.
Used by PLCs and HMIs to write values to remote devices.
Note: Controllers do NOT have interval (write on-demand, not polling).
"""
model_config = ConfigDict(extra="forbid")
outbound_connection_id: str
id: str
value_type: Literal["coil", "discrete_input", "holding_register", "input_register"]
slave_id: SlaveIdInt = 1
address: AddressInt
count: CountInt = 1
interval: Optional[float] = None # Some configs include it, some don't
# ============================================================================
# Physical Values (HIL)
# ============================================================================
class PhysicalValue(BaseModel):
"""Physical value definition for HIL simulation."""
model_config = ConfigDict(extra="forbid")
name: str
io: Optional[Literal["input", "output"]] = None
# ============================================================================
# PLC Identity (IED only)
# ============================================================================
class PLCIdentity(BaseModel):
"""PLC identity information (used in IED scenarios)."""
model_config = ConfigDict(extra="allow") # Allow vendor-specific fields
vendor_name: Optional[str] = None
product_name: Optional[str] = None
vendor_url: Optional[str] = None
product_code: Optional[str] = None
major_minor_revision: Optional[str] = None
model_name: Optional[str] = None
# ============================================================================
# Main Device Types
# ============================================================================
class HMI(BaseModel):
"""Human-Machine Interface configuration."""
model_config = ConfigDict(extra="forbid")
name: str
network: NetworkConfig
inbound_connections: List[Connection] = Field(default_factory=list)
outbound_connections: List[Connection] = Field(default_factory=list)
registers: RegisterBlock
monitors: List[Monitor] = Field(default_factory=list)
controllers: List[Controller] = Field(default_factory=list)
class PLC(BaseModel):
"""Programmable Logic Controller configuration."""
model_config = ConfigDict(extra="forbid")
name: str
logic: str # Filename e.g. "plc1.py"
network: Optional[NetworkConfig] = None
identity: Optional[PLCIdentity] = None
inbound_connections: List[Connection] = Field(default_factory=list)
outbound_connections: List[Connection] = Field(default_factory=list)
registers: RegisterBlock
monitors: List[Monitor] = Field(default_factory=list)
controllers: List[Controller] = Field(default_factory=list)
class Sensor(BaseModel):
"""Sensor device configuration."""
model_config = ConfigDict(extra="forbid")
name: str
hil: str # Reference to HIL name
network: NetworkConfig
inbound_connections: List[Connection] = Field(default_factory=list)
registers: RegisterBlock
class Actuator(BaseModel):
"""Actuator device configuration."""
model_config = ConfigDict(extra="forbid")
name: str
hil: str # Reference to HIL name
logic: Optional[str] = None # Some actuators have custom logic
network: NetworkConfig
inbound_connections: List[Connection] = Field(default_factory=list)
registers: RegisterBlock
physical_values: List[PhysicalValue] = Field(default_factory=list)
class HIL(BaseModel):
"""Hardware-in-the-Loop simulation configuration."""
model_config = ConfigDict(extra="forbid")
name: str
logic: str # Filename e.g. "hil_1.py"
physical_values: List[PhysicalValue] = Field(default_factory=list)
# ============================================================================
# Network Definitions
# ============================================================================
class SerialNetwork(BaseModel):
"""Serial port pair (virtual null-modem cable)."""
model_config = ConfigDict(extra="forbid")
src: str
dest: str
class IPNetwork(BaseModel):
"""Docker network configuration."""
model_config = ConfigDict(extra="forbid")
docker_name: str
name: str
subnet: str
# ============================================================================
# Top-Level Configuration
# ============================================================================
class Config(BaseModel):
"""
Complete ICS-SimLab configuration.
This is the root model for configuration.json files.
"""
model_config = ConfigDict(extra="ignore") # Allow unknown top-level keys
ui: UIConfig
hmis: List[HMI] = Field(default_factory=list)
plcs: List[PLC] = Field(default_factory=list)
sensors: List[Sensor] = Field(default_factory=list)
actuators: List[Actuator] = Field(default_factory=list)
hils: List[HIL] = Field(default_factory=list)
serial_networks: List[SerialNetwork] = Field(default_factory=list)
ip_networks: List[IPNetwork] = Field(default_factory=list)
@model_validator(mode="after")
def validate_unique_names(self) -> "Config":
"""Ensure all device names are unique across all device types."""
names: List[str] = []
for section in [self.hmis, self.plcs, self.sensors, self.actuators, self.hils]:
for item in section:
names.append(item.name)
duplicates = [n for n in set(names) if names.count(n) > 1]
if duplicates:
raise ValueError(f"Duplicate device names found: {duplicates}")
return self
@model_validator(mode="after")
def validate_hil_references(self) -> "Config":
"""Ensure sensors and actuators reference existing HILs."""
hil_names = {h.name for h in self.hils}
for sensor in self.sensors:
if sensor.hil not in hil_names:
raise ValueError(
f"Sensor '{sensor.name}' references unknown HIL '{sensor.hil}'. "
f"Available HILs: {sorted(hil_names)}"
)
for actuator in self.actuators:
if actuator.hil not in hil_names:
raise ValueError(
f"Actuator '{actuator.name}' references unknown HIL '{actuator.hil}'. "
f"Available HILs: {sorted(hil_names)}"
)
return self