ics-simlab-config-gen-claude/tools/semantic_validation.py

356 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Semantic validation for ICS-SimLab configuration.
Validates that HMI monitors and controllers correctly reference:
1. Valid outbound_connection_id in HMI's outbound_connections
2. Reachable target device (by IP)
3. Existing register on target device (by id)
4. Matching value_type and address
This is deterministic validation - no guessing or heuristics.
If something cannot be verified, it fails with a clear error.
"""
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Union
from models.ics_simlab_config_v2 import (
Config,
HMI,
PLC,
Sensor,
Actuator,
RegisterBlock,
TCPConnection,
)
@dataclass
class SemanticError:
"""A semantic validation error."""
entity: str # e.g., "hmi1.monitors[0]"
message: str
def __str__(self) -> str:
return f"{self.entity}: {self.message}"
Device = Union[PLC, Sensor, Actuator]
def _build_device_by_ip(config: Config) -> Dict[str, Tuple[str, Device]]:
"""
Build mapping from IP address to (device_type, device_object).
Only TCP-connected devices are indexed (RTU devices use serial ports).
"""
mapping: Dict[str, Tuple[str, Device]] = {}
for plc in config.plcs:
if plc.network and plc.network.ip:
mapping[plc.network.ip] = ("plc", plc)
for sensor in config.sensors:
if sensor.network and sensor.network.ip:
mapping[sensor.network.ip] = ("sensor", sensor)
for actuator in config.actuators:
if actuator.network and actuator.network.ip:
mapping[actuator.network.ip] = ("actuator", actuator)
return mapping
def _find_register_in_block(
registers: RegisterBlock,
register_id: str,
) -> Optional[Tuple[str, int, int]]:
"""
Find a register by id in a RegisterBlock.
Args:
registers: The RegisterBlock to search
register_id: The register id to find
Returns:
(value_type, address, count) if found, None otherwise
"""
for reg_type, reg_list in [
("coil", registers.coil),
("discrete_input", registers.discrete_input),
("holding_register", registers.holding_register),
("input_register", registers.input_register),
]:
for reg in reg_list:
# Match by id or physical_value (sensors use physical_value)
if reg.id == register_id or reg.physical_value == register_id:
return (reg_type, reg.address, reg.count)
return None
def validate_hmi_semantics(config: Config) -> List[SemanticError]:
"""
Validate HMI monitors and controllers semantically.
For each monitor/controller:
1. Verify outbound_connection_id exists in HMI's outbound_connections
2. Verify target device (by IP) exists
3. Verify register exists on target device
4. Verify value_type and address match target register
Args:
config: Validated Config object
Returns:
List of SemanticError objects (empty if all valid)
"""
errors: List[SemanticError] = []
device_by_ip = _build_device_by_ip(config)
for hmi in config.hmis:
hmi_name = hmi.name
# Build connection_id -> target_ip mapping (TCP connections only)
conn_to_ip: Dict[str, str] = {}
for conn in hmi.outbound_connections:
if isinstance(conn, TCPConnection) and conn.id:
conn_to_ip[conn.id] = conn.ip
# Validate monitors
for i, monitor in enumerate(hmi.monitors):
entity = f"{hmi_name}.monitors[{i}] (id='{monitor.id}')"
# Check outbound_connection exists
if monitor.outbound_connection_id not in conn_to_ip:
errors.append(SemanticError(
entity=entity,
message=(
f"outbound_connection_id '{monitor.outbound_connection_id}' "
f"not found in HMI outbound_connections. "
f"Available: {sorted(conn_to_ip.keys())}"
)
))
continue
target_ip = conn_to_ip[monitor.outbound_connection_id]
# Check target device exists
if target_ip not in device_by_ip:
errors.append(SemanticError(
entity=entity,
message=(
f"Target IP '{target_ip}' not found in any device. "
f"Available IPs: {sorted(device_by_ip.keys())}"
)
))
continue
device_type, device = device_by_ip[target_ip]
# Check register exists on target
reg_info = _find_register_in_block(device.registers, monitor.id)
if reg_info is None:
errors.append(SemanticError(
entity=entity,
message=(
f"Register '{monitor.id}' not found on {device_type} "
f"'{device.name}' (IP: {target_ip})"
)
))
continue
expected_type, expected_addr, expected_count = reg_info
# Verify value_type matches (no guessing - must match exactly)
if monitor.value_type != expected_type:
errors.append(SemanticError(
entity=entity,
message=(
f"value_type mismatch: monitor has '{monitor.value_type}' "
f"but {device.name}.{monitor.id} is '{expected_type}'"
)
))
# Verify address matches
if monitor.address != expected_addr:
errors.append(SemanticError(
entity=entity,
message=(
f"address mismatch: monitor has {monitor.address} "
f"but {device.name}.{monitor.id} is at address {expected_addr}"
)
))
# Validate controllers (same logic as monitors)
for i, controller in enumerate(hmi.controllers):
entity = f"{hmi_name}.controllers[{i}] (id='{controller.id}')"
# Check outbound_connection exists
if controller.outbound_connection_id not in conn_to_ip:
errors.append(SemanticError(
entity=entity,
message=(
f"outbound_connection_id '{controller.outbound_connection_id}' "
f"not found in HMI outbound_connections. "
f"Available: {sorted(conn_to_ip.keys())}"
)
))
continue
target_ip = conn_to_ip[controller.outbound_connection_id]
# Check target device exists
if target_ip not in device_by_ip:
errors.append(SemanticError(
entity=entity,
message=(
f"Target IP '{target_ip}' not found in any device. "
f"Available IPs: {sorted(device_by_ip.keys())}"
)
))
continue
device_type, device = device_by_ip[target_ip]
# Check register exists on target
reg_info = _find_register_in_block(device.registers, controller.id)
if reg_info is None:
errors.append(SemanticError(
entity=entity,
message=(
f"Register '{controller.id}' not found on {device_type} "
f"'{device.name}' (IP: {target_ip})"
)
))
continue
expected_type, expected_addr, expected_count = reg_info
# Verify value_type matches
if controller.value_type != expected_type:
errors.append(SemanticError(
entity=entity,
message=(
f"value_type mismatch: controller has '{controller.value_type}' "
f"but {device.name}.{controller.id} is '{expected_type}'"
)
))
# Verify address matches
if controller.address != expected_addr:
errors.append(SemanticError(
entity=entity,
message=(
f"address mismatch: controller has {controller.address} "
f"but {device.name}.{controller.id} is at address {expected_addr}"
)
))
return errors
def validate_plc_semantics(config: Config) -> List[SemanticError]:
"""
Validate PLC monitors and controllers semantically.
Similar to HMI validation but for PLC-to-sensor/actuator connections.
Args:
config: Validated Config object
Returns:
List of SemanticError objects (empty if all valid)
"""
errors: List[SemanticError] = []
device_by_ip = _build_device_by_ip(config)
for plc in config.plcs:
plc_name = plc.name
# Build connection_id -> target_ip mapping (TCP connections only)
conn_to_ip: Dict[str, str] = {}
for conn in plc.outbound_connections:
if isinstance(conn, TCPConnection) and conn.id:
conn_to_ip[conn.id] = conn.ip
# Validate monitors (skip RTU connections - they don't have IP lookup)
for i, monitor in enumerate(plc.monitors):
# Skip if connection is RTU (not TCP)
if monitor.outbound_connection_id not in conn_to_ip:
# Could be RTU connection - skip silently for PLCs
continue
entity = f"{plc_name}.monitors[{i}] (id='{monitor.id}')"
target_ip = conn_to_ip[monitor.outbound_connection_id]
if target_ip not in device_by_ip:
errors.append(SemanticError(
entity=entity,
message=(
f"Target IP '{target_ip}' not found in any device. "
f"Available IPs: {sorted(device_by_ip.keys())}"
)
))
continue
device_type, device = device_by_ip[target_ip]
reg_info = _find_register_in_block(device.registers, monitor.id)
if reg_info is None:
errors.append(SemanticError(
entity=entity,
message=(
f"Register '{monitor.id}' not found on {device_type} "
f"'{device.name}' (IP: {target_ip})"
)
))
# Validate controllers (skip RTU connections)
for i, controller in enumerate(plc.controllers):
if controller.outbound_connection_id not in conn_to_ip:
continue
entity = f"{plc_name}.controllers[{i}] (id='{controller.id}')"
target_ip = conn_to_ip[controller.outbound_connection_id]
if target_ip not in device_by_ip:
errors.append(SemanticError(
entity=entity,
message=(
f"Target IP '{target_ip}' not found in any device. "
f"Available IPs: {sorted(device_by_ip.keys())}"
)
))
continue
device_type, device = device_by_ip[target_ip]
reg_info = _find_register_in_block(device.registers, controller.id)
if reg_info is None:
errors.append(SemanticError(
entity=entity,
message=(
f"Register '{controller.id}' not found on {device_type} "
f"'{device.name}' (IP: {target_ip})"
)
))
return errors
def validate_all_semantics(config: Config) -> List[SemanticError]:
"""
Run all semantic validations.
Args:
config: Validated Config object
Returns:
List of all SemanticError objects
"""
errors: List[SemanticError] = []
errors.extend(validate_hmi_semantics(config))
errors.extend(validate_plc_semantics(config))
return errors