from __future__ import annotations import re from typing import Any, Dict, List, Tuple # More restrictive: only [a-z0-9_] to avoid docker/compose surprises DOCKER_SAFE_RE = re.compile(r"^[a-z0-9_]+$") def strip_nulls(obj: Any) -> Any: """ Recursively remove keys with None values from dicts and None items from lists. This canonicalizes LLM output by removing noise like: {"id": null, "io": null, "physical_value": null} turning it into: {} Args: obj: Any JSON-serializable object (dict, list, scalar) Returns: The same structure with None values/items removed """ if isinstance(obj, dict): return {k: strip_nulls(v) for k, v in obj.items() if v is not None} elif isinstance(obj, list): return [strip_nulls(item) for item in obj if item is not None] else: return obj def patch_fill_required_keys(cfg: dict[str, Any]) -> Tuple[dict[str, Any], List[str]]: """ Ensure keys that ICS-SimLab setup.py reads with direct indexing exist. Prevents KeyError like plc["controllers"] or ui["network"]. Returns: (patched_cfg, patch_errors) """ patch_errors: List[str] = [] if not isinstance(cfg, dict): return cfg, ["Top-level JSON is not an object"] # Top-level defaults if "ui" not in cfg or not isinstance(cfg.get("ui"), dict): cfg["ui"] = {} # ui.network required by setup.py ui = cfg["ui"] if "network" not in ui or not isinstance(ui.get("network"), dict): ui["network"] = {} uinet = ui["network"] # Ensure port exists (safe default) if "port" not in uinet: uinet["port"] = 5000 for k in ["hmis", "plcs", "sensors", "actuators", "hils", "serial_networks", "ip_networks"]: if k not in cfg or not isinstance(cfg.get(k), list): cfg[k] = [] def ensure_registers(obj: dict[str, Any]) -> None: r = obj.setdefault("registers", {}) if not isinstance(r, dict): obj["registers"] = {} r = obj["registers"] for kk in ["coil", "discrete_input", "holding_register", "input_register"]: if kk not in r or not isinstance(r.get(kk), list): r[kk] = [] def ensure_plc(plc: dict[str, Any]) -> None: plc.setdefault("inbound_connections", []) plc.setdefault("outbound_connections", []) ensure_registers(plc) plc.setdefault("monitors", []) plc.setdefault("controllers", []) # critical for setup.py def ensure_hmi(hmi: dict[str, Any]) -> None: hmi.setdefault("inbound_connections", []) hmi.setdefault("outbound_connections", []) ensure_registers(hmi) hmi.setdefault("monitors", []) hmi.setdefault("controllers", []) def ensure_sensor(s: dict[str, Any]) -> None: s.setdefault("inbound_connections", []) ensure_registers(s) def ensure_actuator(a: dict[str, Any]) -> None: a.setdefault("inbound_connections", []) ensure_registers(a) for item in cfg.get("plcs", []) or []: if isinstance(item, dict): ensure_plc(item) else: patch_errors.append("plcs contains non-object item") for item in cfg.get("hmis", []) or []: if isinstance(item, dict): ensure_hmi(item) else: patch_errors.append("hmis contains non-object item") for item in cfg.get("sensors", []) or []: if isinstance(item, dict): ensure_sensor(item) else: patch_errors.append("sensors contains non-object item") for item in cfg.get("actuators", []) or []: if isinstance(item, dict): ensure_actuator(item) else: patch_errors.append("actuators contains non-object item") return cfg, patch_errors def patch_lowercase_names(cfg: dict[str, Any]) -> Tuple[dict[str, Any], List[str]]: """ Force all device names to lowercase. Updates references that depend on device names (sensor/actuator 'hil'). Returns: (patched_cfg, patch_errors) """ patch_errors: List[str] = [] if not isinstance(cfg, dict): return cfg, ["Top-level JSON is not an object"] mapping: Dict[str, str] = {} all_names: List[str] = [] for section in ["hmis", "plcs", "sensors", "actuators", "hils"]: for dev in cfg.get(section, []) or []: if isinstance(dev, dict) and isinstance(dev.get("name"), str): n = dev["name"] all_names.append(n) mapping[n] = n.lower() lowered = [n.lower() for n in all_names] collisions = {n for n in set(lowered) if lowered.count(n) > 1} if collisions: patch_errors.append(f"Lowercase patch would create duplicate device names: {sorted(list(collisions))}") # apply for section in ["hmis", "plcs", "sensors", "actuators", "hils"]: for dev in cfg.get(section, []) or []: if isinstance(dev, dict) and isinstance(dev.get("name"), str): dev["name"] = dev["name"].lower() # update references for section in ["sensors", "actuators"]: for dev in cfg.get(section, []) or []: if not isinstance(dev, dict): continue h = dev.get("hil") if isinstance(h, str): dev["hil"] = mapping.get(h, h.lower()) return cfg, patch_errors def sanitize_docker_name(name: str) -> str: """ Very safe docker name: [a-z0-9_] only, lowercase. """ s = (name or "").strip().lower() s = re.sub(r"\s+", "_", s) # spaces -> _ s = re.sub(r"[^a-z0-9_]", "", s) # keep only [a-z0-9_] s = re.sub(r"_+", "_", s) s = s.strip("_") if not s: s = "network" if not s[0].isalnum(): s = "n" + s return s def sanitize_connection_id(name: str) -> str: """ Sanitize outbound_connection id to docker-safe format: [a-z0-9_] only. This ensures connection IDs are consistent and safe for use in docker container networking and as Python variable names. Args: name: Original connection id (e.g., "To-Sensor1", "TO_ACTUATOR") Returns: Sanitized id (e.g., "to_sensor1", "to_actuator") """ s = (name or "").strip().lower() s = re.sub(r"\s+", "_", s) # spaces -> _ s = re.sub(r"-", "_", s) # hyphens -> _ (common in connection IDs) s = re.sub(r"[^a-z0-9_]", "", s) # keep only [a-z0-9_] s = re.sub(r"_+", "_", s) # collapse multiple underscores s = s.strip("_") if not s: s = "connection" if not s[0].isalnum(): s = "c" + s return s def patch_sanitize_network_names(cfg: dict[str, Any]) -> Tuple[dict[str, Any], List[str]]: """ Make ip_networks names docker-safe and align ip_networks[].name == ip_networks[].docker_name. Update references to docker_network fields. Returns: (patched_cfg, patch_errors) """ patch_errors: List[str] = [] if not isinstance(cfg, dict): return cfg, ["Top-level JSON is not an object"] dn_map: Dict[str, str] = {} for net in cfg.get("ip_networks", []) or []: if not isinstance(net, dict): continue # Ensure docker_name exists if not isinstance(net.get("docker_name"), str): if isinstance(net.get("name"), str): net["docker_name"] = sanitize_docker_name(net["name"]) else: continue old_dn = net["docker_name"] new_dn = sanitize_docker_name(old_dn) dn_map[old_dn] = new_dn net["docker_name"] = new_dn # force aligned name net["name"] = new_dn # ui docker_network ui = cfg.get("ui") if isinstance(ui, dict): uinet = ui.get("network") if isinstance(uinet, dict): dn = uinet.get("docker_network") if isinstance(dn, str): uinet["docker_network"] = dn_map.get(dn, sanitize_docker_name(dn)) # device docker_network for section in ["hmis", "plcs", "sensors", "actuators"]: for dev in cfg.get(section, []) or []: if not isinstance(dev, dict): continue net = dev.get("network") if not isinstance(net, dict): continue dn = net.get("docker_network") if isinstance(dn, str): net["docker_network"] = dn_map.get(dn, sanitize_docker_name(dn)) # validate docker-safety for net in cfg.get("ip_networks", []) or []: if not isinstance(net, dict): continue dn = net.get("docker_name") nm = net.get("name") if isinstance(dn, str) and not DOCKER_SAFE_RE.match(dn): patch_errors.append(f"ip_networks.docker_name not docker-safe after patch: {dn}") if isinstance(nm, str) and not DOCKER_SAFE_RE.match(nm): patch_errors.append(f"ip_networks.name not docker-safe after patch: {nm}") return cfg, patch_errors def patch_sanitize_connection_ids(cfg: dict[str, Any]) -> Tuple[dict[str, Any], List[str]]: """ Sanitize all outbound_connection IDs to docker-safe format [a-z0-9_]. Update all monitor/controller outbound_connection_id references consistently. This ensures connection IDs are: - Lowercase - Only contain [a-z0-9_] - Consistent between outbound_connections and monitors/controllers Returns: (patched_cfg, patch_errors) """ patch_errors: List[str] = [] if not isinstance(cfg, dict): return cfg, ["Top-level JSON is not an object"] # Process PLCs and HMIs (both have outbound_connections, monitors, controllers) for section in ["plcs", "hmis"]: for dev in cfg.get(section, []) or []: if not isinstance(dev, dict): continue dev_name = dev.get("name", "unknown") # Build mapping: old_id -> new_id id_map: Dict[str, str] = {} # Sanitize outbound_connection IDs for conn in dev.get("outbound_connections", []) or []: if not isinstance(conn, dict): continue old_id = conn.get("id") if isinstance(old_id, str) and old_id: new_id = sanitize_connection_id(old_id) if old_id != new_id: id_map[old_id] = new_id conn["id"] = new_id # Update monitor outbound_connection_id references for monitor in dev.get("monitors", []) or []: if not isinstance(monitor, dict): continue conn_id = monitor.get("outbound_connection_id") if isinstance(conn_id, str): # Use mapped ID if changed, otherwise sanitize directly if conn_id in id_map: monitor["outbound_connection_id"] = id_map[conn_id] else: monitor["outbound_connection_id"] = sanitize_connection_id(conn_id) # Update controller outbound_connection_id references for controller in dev.get("controllers", []) or []: if not isinstance(controller, dict): continue conn_id = controller.get("outbound_connection_id") if isinstance(conn_id, str): if conn_id in id_map: controller["outbound_connection_id"] = id_map[conn_id] else: controller["outbound_connection_id"] = sanitize_connection_id(conn_id) # Validate all connection IDs are docker-safe after patch for section in ["plcs", "hmis"]: for dev in cfg.get(section, []) or []: if not isinstance(dev, dict): continue dev_name = dev.get("name", "unknown") for conn in dev.get("outbound_connections", []) or []: if not isinstance(conn, dict): continue conn_id = conn.get("id") if isinstance(conn_id, str) and not DOCKER_SAFE_RE.match(conn_id): patch_errors.append( f"{section}['{dev_name}'].outbound_connections[].id " f"not docker-safe after patch: {conn_id}" ) return cfg, patch_errors