147 lines
5.5 KiB
Python
147 lines
5.5 KiB
Python
"""Deterministic code templates for ICS-SimLab logic generation (tank model)."""
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Iterable, Optional
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class TankParams:
|
|
dt: float = 0.1
|
|
area: float = 1.0
|
|
max_level: float = 1.0
|
|
inflow_rate: float = 0.25
|
|
outflow_rate: float = 0.25
|
|
leak_rate: float = 0.0
|
|
|
|
|
|
def _header(comment: str) -> str:
|
|
return (
|
|
'"""\n'
|
|
f"{comment}\n\n"
|
|
"Autogenerated by ics-simlab-config-gen (deterministic templates).\n"
|
|
'"""\n\n'
|
|
)
|
|
|
|
|
|
def render_plc_threshold(
|
|
plc_name: str,
|
|
level_id: str,
|
|
inlet_valve_id: str,
|
|
outlet_valve_id: str,
|
|
low: float = 0.2,
|
|
high: float = 0.8,
|
|
) -> str:
|
|
return (
|
|
_header(f"PLC logic for {plc_name}: threshold control for tank level.")
|
|
+ "from typing import Any, Callable, Dict\n\n\n"
|
|
+ "def _get_float(regs: Dict[str, Any], key: str, default: float = 0.0) -> float:\n"
|
|
+ " try:\n"
|
|
+ " return float(regs[key]['value'])\n"
|
|
+ " except Exception:\n"
|
|
+ " return default\n\n\n"
|
|
+ "def _write(\n"
|
|
+ " out_regs: Dict[str, Any],\n"
|
|
+ " cbs: Dict[str, Callable[[], None]],\n"
|
|
+ " key: str,\n"
|
|
+ " value: int,\n"
|
|
+ ") -> None:\n"
|
|
+ " if key not in out_regs:\n"
|
|
+ " return\n"
|
|
+ " cur = out_regs[key].get('value', None)\n"
|
|
+ " if cur == value:\n"
|
|
+ " return\n"
|
|
+ " out_regs[key]['value'] = value\n"
|
|
+ " if key in cbs:\n"
|
|
+ " cbs[key]()\n\n\n"
|
|
+ "def logic(input_registers, output_registers, state_update_callbacks):\n"
|
|
+ f" level = _get_float(input_registers, '{level_id}', default=0.0)\n"
|
|
+ f" low = {float(low)}\n"
|
|
+ f" high = {float(high)}\n\n"
|
|
+ " if level <= low:\n"
|
|
+ f" _write(output_registers, state_update_callbacks, '{inlet_valve_id}', 1)\n"
|
|
+ f" _write(output_registers, state_update_callbacks, '{outlet_valve_id}', 0)\n"
|
|
+ " return\n"
|
|
+ " if level >= high:\n"
|
|
+ f" _write(output_registers, state_update_callbacks, '{inlet_valve_id}', 0)\n"
|
|
+ f" _write(output_registers, state_update_callbacks, '{outlet_valve_id}', 1)\n"
|
|
+ " return\n"
|
|
+ " return\n"
|
|
)
|
|
|
|
|
|
def render_plc_stub(plc_name: str) -> str:
|
|
return (
|
|
_header(f"PLC logic for {plc_name}: stub (does nothing).")
|
|
+ "def logic(input_registers, output_registers, state_update_callbacks):\n"
|
|
+ " return\n"
|
|
)
|
|
|
|
|
|
def render_hil_tank(
|
|
hil_name: str,
|
|
level_out_id: str,
|
|
inlet_cmd_in_id: str,
|
|
outlet_cmd_in_id: str,
|
|
required_output_ids: Iterable[str],
|
|
params: Optional[TankParams] = None,
|
|
initial_level: Optional[float] = None,
|
|
) -> str:
|
|
p = params or TankParams()
|
|
init_level = float(initial_level) if initial_level is not None else (0.5 * p.max_level)
|
|
|
|
required_outputs_list = list(required_output_ids)
|
|
|
|
lines = []
|
|
lines.append(_header(f"HIL logic for {hil_name}: tank physical model (discrete-time)."))
|
|
lines.append("def _as_float(x, default=0.0):\n")
|
|
lines.append(" try:\n")
|
|
lines.append(" return float(x)\n")
|
|
lines.append(" except Exception:\n")
|
|
lines.append(" return float(default)\n\n\n")
|
|
lines.append("def _as_cmd01(x) -> float:\n")
|
|
lines.append(" v = _as_float(x, default=0.0)\n")
|
|
lines.append(" return 1.0 if v > 0.5 else 0.0\n\n\n")
|
|
lines.append("def logic(physical_values):\n")
|
|
|
|
lines.append(" # Initialize required output physical values (robust defaults)\n")
|
|
for oid in required_outputs_list:
|
|
if oid == level_out_id:
|
|
lines.append(f" physical_values.setdefault('{oid}', {init_level})\n")
|
|
else:
|
|
lines.append(f" physical_values.setdefault('{oid}', 0.0)\n")
|
|
|
|
lines.append("\n")
|
|
lines.append(f" inlet_cmd = _as_cmd01(physical_values.get('{inlet_cmd_in_id}', 0.0))\n")
|
|
lines.append(f" outlet_cmd = _as_cmd01(physical_values.get('{outlet_cmd_in_id}', 0.0))\n")
|
|
lines.append("\n")
|
|
lines.append(f" dt = {float(p.dt)}\n")
|
|
lines.append(f" area = {float(p.area)}\n")
|
|
lines.append(f" max_level = {float(p.max_level)}\n")
|
|
lines.append(f" inflow_rate = {float(p.inflow_rate)}\n")
|
|
lines.append(f" outflow_rate = {float(p.outflow_rate)}\n")
|
|
lines.append(f" leak_rate = {float(p.leak_rate)}\n")
|
|
lines.append("\n")
|
|
lines.append(f" level = _as_float(physical_values.get('{level_out_id}', 0.0), default=0.0)\n")
|
|
lines.append(" inflow = inlet_cmd * inflow_rate\n")
|
|
lines.append(" outflow = outlet_cmd * outflow_rate\n")
|
|
lines.append(" dlevel = dt * (inflow - outflow - leak_rate) / area\n")
|
|
lines.append(" level = level + dlevel\n")
|
|
lines.append(" if level < 0.0:\n")
|
|
lines.append(" level = 0.0\n")
|
|
lines.append(" if level > max_level:\n")
|
|
lines.append(" level = max_level\n")
|
|
lines.append(f" physical_values['{level_out_id}'] = level\n")
|
|
lines.append(" return\n")
|
|
|
|
return "".join(lines)
|
|
|
|
|
|
def render_hil_stub(hil_name: str, required_output_ids: Iterable[str]) -> str:
|
|
lines = []
|
|
lines.append(_header(f"HIL logic for {hil_name}: stub (only init outputs)."))
|
|
lines.append("def logic(physical_values):\n")
|
|
for oid in required_output_ids:
|
|
lines.append(f" physical_values.setdefault('{oid}', 0.0)\n")
|
|
lines.append(" return\n")
|
|
return "".join(lines)
|