fix: dict anomaly access, configurable fail mode, exception type logging

- Add get_anomaly_attr() helper that handles both dict and object
  anomalies. The SDK's send_message() returns dicts, so getattr()
  was silently returning defaults -- critical blocking never triggered.
- Fix field name: "detail" -> "details" (matches SDK schema).
- Make fail-open/fail-closed configurable via INSAITS_FAIL_MODE env var
  (defaults to "open" for backward compatibility).
- Include exception type name in fail-open log for diagnostics.
- Normalize severity comparison with .upper() for case-insensitive matching.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Nomadu27
2026-03-10 18:53:21 +01:00
parent 68fc85ea49
commit e30109829b

View File

@@ -40,6 +40,8 @@ Environment variables:
INSAITS_DEV_MODE Set to "true" to enable dev mode (no API key needed).
Defaults to "false" (strict mode).
INSAITS_MODEL LLM model identifier for fingerprinting. Default: claude-opus.
INSAITS_FAIL_MODE "open" (default) = continue on SDK errors.
"closed" = block tool execution on SDK errors.
INSAITS_VERBOSE Set to any value to enable debug logging.
Detections include:
@@ -141,6 +143,18 @@ def write_audit(event: Dict[str, Any]) -> None:
log.warning("Failed to write audit log %s: %s", AUDIT_FILE, exc)
def get_anomaly_attr(anomaly: Any, key: str, default: str = "") -> str:
"""Get a field from an anomaly that may be a dict or an object.
The SDK's ``send_message()`` returns anomalies as dicts, while
other code paths may return dataclass/object instances. This
helper handles both transparently.
"""
if isinstance(anomaly, dict):
return str(anomaly.get(key, default))
return str(getattr(anomaly, key, default))
def format_feedback(anomalies: List[Any]) -> str:
"""Format detected anomalies as feedback for Claude Code.
@@ -152,9 +166,9 @@ def format_feedback(anomalies: List[Any]) -> str:
"",
]
for i, a in enumerate(anomalies, 1):
sev: str = getattr(a, "severity", "MEDIUM")
atype: str = getattr(a, "type", "UNKNOWN")
detail: str = getattr(a, "detail", "")
sev: str = get_anomaly_attr(a, "severity", "MEDIUM")
atype: str = get_anomaly_attr(a, "type", "UNKNOWN")
detail: str = get_anomaly_attr(a, "details", "")
lines.extend([
f"{i}. [{sev}] {atype}",
f" {detail[:120]}",
@@ -203,7 +217,17 @@ def main() -> None:
llm_id=os.environ.get("INSAITS_MODEL", DEFAULT_MODEL),
)
except Exception as exc:
log.warning("SDK error, skipping security scan: %s", exc)
fail_mode: str = os.environ.get("INSAITS_FAIL_MODE", "open").lower()
if fail_mode == "closed":
sys.stdout.write(
f"InsAIts SDK error ({type(exc).__name__}); "
"blocking execution to avoid unscanned input.\n"
)
sys.exit(2)
log.warning(
"SDK error (%s), skipping security scan: %s",
type(exc).__name__, exc,
)
sys.exit(0)
anomalies: List[Any] = result.get("anomalies", [])
@@ -213,7 +237,7 @@ def main() -> None:
"tool": data.get("tool_name", "unknown"),
"context": context,
"anomaly_count": len(anomalies),
"anomaly_types": [getattr(a, "type", "") for a in anomalies],
"anomaly_types": [get_anomaly_attr(a, "type") for a in anomalies],
"text_length": len(text),
})
@@ -223,7 +247,7 @@ def main() -> None:
# Determine maximum severity
has_critical: bool = any(
getattr(a, "severity", "") in ("CRITICAL", "critical") for a in anomalies
get_anomaly_attr(a, "severity").upper() in ("CRITICAL",) for a in anomalies
)
feedback: str = format_feedback(anomalies)