#!/usr/bin/env python3 """ InsAIts Security Monitor -- PreToolUse Hook for Claude Code ============================================================ Real-time security monitoring for Claude Code tool inputs. Detects credential exposure, prompt injection, behavioral anomalies, hallucination chains, and 20+ other anomaly types -- runs 100% locally. Writes audit events to .insaits_audit_session.jsonl for forensic tracing. Setup: pip install insa-its Add to .claude/settings.json: { "hooks": { "PreToolUse": [ { "matcher": ".*", "hooks": [ { "type": "command", "command": "python3 scripts/hooks/insaits-security-monitor.py" } ] } ] } } How it works: Claude Code passes tool input as JSON on stdin. This script runs InsAIts anomaly detection on the content. Exit code 0 = clean (pass through). Exit code 2 = critical issue found (blocks tool execution). Stderr output = non-blocking warning shown to Claude. Detections include: - Credential exposure (API keys, tokens, passwords) - Prompt injection patterns - Hallucination indicators (phantom citations, fact contradictions) - Behavioral anomalies (context loss, semantic drift) - Tool description divergence - Shorthand emergence / jargon drift All processing is local -- no data leaves your machine. Author: Cristi Bogdan -- YuyAI (https://github.com/Nomadu27/InsAIts) License: Apache 2.0 """ from __future__ import annotations import hashlib import json import logging import os import sys import time from typing import Any, Dict, List, Optional, Tuple # Configure logging to stderr so it does not interfere with stdout protocol logging.basicConfig( stream=sys.stderr, format="[InsAIts] %(message)s", level=logging.DEBUG if os.environ.get("INSAITS_VERBOSE") else logging.WARNING, ) log = logging.getLogger("insaits-hook") # Try importing InsAIts SDK try: from insa_its import insAItsMonitor INSAITS_AVAILABLE: bool = True except ImportError: INSAITS_AVAILABLE = False AUDIT_FILE: str = ".insaits_audit_session.jsonl" def extract_content(data: Dict[str, Any]) -> Tuple[str, str]: """Extract inspectable text from a Claude Code tool input payload. Returns: A (text, context) tuple where *text* is the content to scan and *context* is a short label for the audit log. """ tool_name: str = data.get("tool_name", "") tool_input: Dict[str, Any] = data.get("tool_input", {}) tool_result: Any = data.get("tool_response", {}) text: str = "" context: str = "" if tool_name in ("Write", "Edit", "MultiEdit"): text = tool_input.get("content", "") or tool_input.get("new_string", "") context = "file:" + str(tool_input.get("file_path", ""))[:80] elif tool_name == "Bash": command: str = str(tool_input.get("command", "")) # For PreToolUse we inspect the command itself text = command # Also check tool_response if present (for flexibility) if isinstance(tool_result, dict): output = tool_result.get("output", "") or tool_result.get("stdout", "") if output: text = text + "\n" + output elif isinstance(tool_result, str) and tool_result: text = text + "\n" + tool_result context = "bash:" + command[:80] elif "content" in data: content: Any = data["content"] if isinstance(content, list): text = "\n".join( b.get("text", "") for b in content if b.get("type") == "text" ) elif isinstance(content, str): text = content context = str(data.get("task", "")) return text, context def write_audit(event: Dict[str, Any]) -> None: """Append an audit event to the JSONL audit log.""" try: event["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) event["hash"] = hashlib.sha256( json.dumps(event, sort_keys=True).encode() ).hexdigest()[:16] with open(AUDIT_FILE, "a", encoding="utf-8") as f: f.write(json.dumps(event) + "\n") except OSError as exc: log.warning("Failed to write audit log %s: %s", AUDIT_FILE, exc) def format_feedback(anomalies: List[Any]) -> str: """Format detected anomalies as feedback for Claude Code. Returns: A human-readable multi-line string describing each finding. """ lines: List[str] = [ "== InsAIts Security Monitor -- Issues Detected ==", "", ] for i, a in enumerate(anomalies, 1): sev: str = getattr(a, "severity", "MEDIUM") atype: str = getattr(a, "type", "UNKNOWN") detail: str = getattr(a, "detail", "") lines.extend([ f"{i}. [{sev}] {atype}", f" {detail[:120]}", "", ]) lines.extend([ "-" * 56, "Fix the issues above before continuing.", "Audit log: " + AUDIT_FILE, ]) return "\n".join(lines) def main() -> None: """Entry point for the Claude Code PreToolUse hook.""" raw: str = sys.stdin.read().strip() if not raw: sys.exit(0) try: data: Dict[str, Any] = json.loads(raw) except json.JSONDecodeError: data = {"content": raw} text, context = extract_content(data) # Skip very short content (e.g. "OK", empty bash results) if len(text.strip()) < 10: sys.exit(0) if not INSAITS_AVAILABLE: log.warning("Not installed. Run: pip install insa-its") sys.exit(0) monitor: insAItsMonitor = insAItsMonitor( session_name="claude-code-hook", dev_mode=True, ) result: Dict[str, Any] = monitor.send_message( text=text[:4000], sender_id="claude-code", llm_id=os.environ.get("INSAITS_MODEL", "claude-opus"), ) anomalies: List[Any] = result.get("anomalies", []) # Write audit event regardless of findings write_audit({ "tool": data.get("tool_name", "unknown"), "context": context, "anomaly_count": len(anomalies), "anomaly_types": [getattr(a, "type", "") for a in anomalies], "text_length": len(text), }) if not anomalies: log.debug("Clean -- no anomalies detected.") sys.exit(0) # Determine maximum severity has_critical: bool = any( getattr(a, "severity", "") in ("CRITICAL", "critical") for a in anomalies ) feedback: str = format_feedback(anomalies) if has_critical: # stdout feedback -> Claude Code shows to the model sys.stdout.write(feedback + "\n") sys.exit(2) # PreToolUse exit 2 = block tool execution else: # Non-critical: warn via stderr (non-blocking) log.warning("\n%s", feedback) sys.exit(0) if __name__ == "__main__": main()