From e30109829b3070a276ff0abdf33339b996237238 Mon Sep 17 00:00:00 2001 From: Nomadu27 Date: Tue, 10 Mar 2026 18:53:21 +0100 Subject: [PATCH] fix: dict anomaly access, configurable fail mode, exception type logging - Add get_anomaly_attr() helper that handles both dict and object anomalies. The SDK's send_message() returns dicts, so getattr() was silently returning defaults -- critical blocking never triggered. - Fix field name: "detail" -> "details" (matches SDK schema). - Make fail-open/fail-closed configurable via INSAITS_FAIL_MODE env var (defaults to "open" for backward compatibility). - Include exception type name in fail-open log for diagnostics. - Normalize severity comparison with .upper() for case-insensitive matching. Co-Authored-By: Claude Opus 4.6 --- scripts/hooks/insaits-security-monitor.py | 36 +++++++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/scripts/hooks/insaits-security-monitor.py b/scripts/hooks/insaits-security-monitor.py index 43e5be74..16afbfbf 100644 --- a/scripts/hooks/insaits-security-monitor.py +++ b/scripts/hooks/insaits-security-monitor.py @@ -40,6 +40,8 @@ Environment variables: INSAITS_DEV_MODE Set to "true" to enable dev mode (no API key needed). Defaults to "false" (strict mode). INSAITS_MODEL LLM model identifier for fingerprinting. Default: claude-opus. + INSAITS_FAIL_MODE "open" (default) = continue on SDK errors. + "closed" = block tool execution on SDK errors. INSAITS_VERBOSE Set to any value to enable debug logging. Detections include: @@ -141,6 +143,18 @@ def write_audit(event: Dict[str, Any]) -> None: log.warning("Failed to write audit log %s: %s", AUDIT_FILE, exc) +def get_anomaly_attr(anomaly: Any, key: str, default: str = "") -> str: + """Get a field from an anomaly that may be a dict or an object. + + The SDK's ``send_message()`` returns anomalies as dicts, while + other code paths may return dataclass/object instances. This + helper handles both transparently. + """ + if isinstance(anomaly, dict): + return str(anomaly.get(key, default)) + return str(getattr(anomaly, key, default)) + + def format_feedback(anomalies: List[Any]) -> str: """Format detected anomalies as feedback for Claude Code. @@ -152,9 +166,9 @@ def format_feedback(anomalies: List[Any]) -> str: "", ] for i, a in enumerate(anomalies, 1): - sev: str = getattr(a, "severity", "MEDIUM") - atype: str = getattr(a, "type", "UNKNOWN") - detail: str = getattr(a, "detail", "") + sev: str = get_anomaly_attr(a, "severity", "MEDIUM") + atype: str = get_anomaly_attr(a, "type", "UNKNOWN") + detail: str = get_anomaly_attr(a, "details", "") lines.extend([ f"{i}. [{sev}] {atype}", f" {detail[:120]}", @@ -203,7 +217,17 @@ def main() -> None: llm_id=os.environ.get("INSAITS_MODEL", DEFAULT_MODEL), ) except Exception as exc: - log.warning("SDK error, skipping security scan: %s", exc) + fail_mode: str = os.environ.get("INSAITS_FAIL_MODE", "open").lower() + if fail_mode == "closed": + sys.stdout.write( + f"InsAIts SDK error ({type(exc).__name__}); " + "blocking execution to avoid unscanned input.\n" + ) + sys.exit(2) + log.warning( + "SDK error (%s), skipping security scan: %s", + type(exc).__name__, exc, + ) sys.exit(0) anomalies: List[Any] = result.get("anomalies", []) @@ -213,7 +237,7 @@ def main() -> None: "tool": data.get("tool_name", "unknown"), "context": context, "anomaly_count": len(anomalies), - "anomaly_types": [getattr(a, "type", "") for a in anomalies], + "anomaly_types": [get_anomaly_attr(a, "type") for a in anomalies], "text_length": len(text), }) @@ -223,7 +247,7 @@ def main() -> None: # Determine maximum severity has_critical: bool = any( - getattr(a, "severity", "") in ("CRITICAL", "critical") for a in anomalies + get_anomaly_attr(a, "severity").upper() in ("CRITICAL",) for a in anomalies ) feedback: str = format_feedback(anomalies)