from __future__ import annotations import ast from dataclasses import dataclass from pathlib import Path from typing import List, Optional @dataclass class CallbackIssue: file: str key: str message: str def _has_write_helper(tree: ast.AST) -> bool: """ Check if the file defines a _write() function that handles callbacks internally. This is our generated pattern: _write(out_regs, cbs, key, value) does both write+callback. """ for node in ast.walk(tree): if isinstance(node, ast.FunctionDef) and node.name == "_write": return True return False def _is_write_helper_call(stmt: ast.stmt) -> Optional[str]: """ Recognize calls like: _write(output_registers, state_update_callbacks, 'key', value) Returns the output key if recognized, None otherwise. """ if not isinstance(stmt, ast.Expr): return None call = stmt.value if not isinstance(call, ast.Call): return None func = call.func if not (isinstance(func, ast.Name) and func.id == "_write"): return None # _write(out_regs, cbs, key, value) - key is the 3rd argument if len(call.args) >= 3: key_arg = call.args[2] if isinstance(key_arg, ast.Constant) and isinstance(key_arg.value, str): return key_arg.value return None def _extract_output_key_from_assign(stmt: ast.stmt) -> Optional[str]: """ Riconosce assegnazioni tipo: output_registers["X"]["value"] = ... output_registers['X']['value'] += ... Restituisce "X" se è una stringa letterale, altrimenti None. """ target = None if isinstance(stmt, ast.Assign) and stmt.targets: target = stmt.targets[0] elif isinstance(stmt, ast.AugAssign): target = stmt.target else: return None # target deve essere Subscript(...)[...]["value"] if not isinstance(target, ast.Subscript): return None # output_registers["X"]["value"] è un Subscript su un Subscript inner = target.value if not isinstance(inner, ast.Subscript): return None # outer slice deve essere "value" outer_slice = target.slice if isinstance(outer_slice, ast.Constant) and outer_slice.value != "value": return None if not (isinstance(outer_slice, ast.Constant) and outer_slice.value == "value"): return None # inner deve essere output_registers["X"] base = inner.value if not (isinstance(base, ast.Name) and base.id == "output_registers"): return None inner_slice = inner.slice if isinstance(inner_slice, ast.Constant) and isinstance(inner_slice.value, str): return inner_slice.value return None def _is_callback_call(stmt: ast.stmt, key: str) -> bool: """ Riconosce: state_update_callbacks["X"]() come statement singolo. """ if not isinstance(stmt, ast.Expr): return False call = stmt.value if not isinstance(call, ast.Call): return False func = call.func if not isinstance(func, ast.Subscript): return False base = func.value if not (isinstance(base, ast.Name) and base.id == "state_update_callbacks"): return False sl = func.slice return isinstance(sl, ast.Constant) and sl.value == key def _validate_block(stmts: List[ast.stmt], file_path: str, window: int = 3) -> List[CallbackIssue]: issues: List[CallbackIssue] = [] i = 0 while i < len(stmts): s = stmts[i] key = _extract_output_key_from_assign(s) if key is not None: # cerca callback nelle prossime "window" istruzioni dello stesso blocco found = False for j in range(i + 1, min(len(stmts), i + 1 + window)): if _is_callback_call(stmts[j], key): found = True break if not found: issues.append( CallbackIssue( file=file_path, key=key, message=f"Write su output_registers['{key}']['value'] senza callback state_update_callbacks['{key}']() nelle prossime {window} istruzioni dello stesso blocco." ) ) # ricorsione su blocchi annidati if isinstance(s, (ast.If, ast.For, ast.While, ast.With, ast.Try)): for child in getattr(s, "body", []) or []: pass issues += _validate_block(getattr(s, "body", []) or [], file_path, window=window) issues += _validate_block(getattr(s, "orelse", []) or [], file_path, window=window) issues += _validate_block(getattr(s, "finalbody", []) or [], file_path, window=window) for h in getattr(s, "handlers", []) or []: issues += _validate_block(getattr(h, "body", []) or [], file_path, window=window) i += 1 return issues def validate_plc_callbacks(plc_logic_file: str, window: int = 3) -> List[CallbackIssue]: """ Cerca nel def logic(...) del file PLC: output_registers["X"]["value"] = ... e verifica che subito dopo (entro window) ci sia: state_update_callbacks["X"]() OPPURE se il file definisce una funzione _write() che gestisce internamente sia la scrittura che il callback, quella è considerata valida. """ path = Path(plc_logic_file) text = path.read_text(encoding="utf-8", errors="replace") tree = ast.parse(text) # Check if this file uses the _write() helper pattern # If so, skip strict callback validation - _write() handles it internally if _has_write_helper(tree): return [] # Pattern is valid by design # trova def logic(...) logic_fn = None for node in tree.body: if isinstance(node, ast.FunctionDef) and node.name == "logic": logic_fn = node break if logic_fn is None: # non è per forza un errore, ma per noi sì: i PLC devono avere logic() return [CallbackIssue(str(path), "", "Funzione def logic(...) non trovata nel file PLC.")] return _validate_block(logic_fn.body, str(path), window=window)