mirror of
https://github.com/affaan-m/everything-claude-code.git
synced 2026-03-30 13:43:26 +08:00
* feat(skills): add skill-comply — automated behavioral compliance measurement Automated compliance measurement for skills, rules, and agent definitions. Generates behavioral specs, runs scenarios at 3 strictness levels, classifies tool calls via LLM, and produces self-contained reports. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(skill-comply): address bot review feedback - AGENTS.md: fix stale skill count (115 → 117) in project structure - run.py: replace remaining print() with logger, add zero-division guard, create parent dirs for --output path - runner.py: add returncode check for claude subprocess, clarify relative_to path traversal validation - parser.py: use is_file() instead of exists(), catch KeyError for missing trace fields, add file check in parse_spec - classifier.py: log warnings on malformed classification output, guard against non-dict JSON responses - grader.py: filter negative indices from LLM classification Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
162 lines
5.3 KiB
Python
162 lines
5.3 KiB
Python
"""Run scenarios via claude -p and parse tool calls from stream-json output."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import shlex
|
|
import shutil
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from scripts.parser import ObservationEvent
|
|
from scripts.scenario_generator import Scenario
|
|
|
|
SANDBOX_BASE = Path("/tmp/skill-comply-sandbox")
|
|
ALLOWED_MODELS = frozenset({"haiku", "sonnet", "opus"})
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ScenarioRun:
|
|
scenario: Scenario
|
|
observations: tuple[ObservationEvent, ...]
|
|
sandbox_dir: Path
|
|
|
|
|
|
def run_scenario(
|
|
scenario: Scenario,
|
|
model: str = "sonnet",
|
|
max_turns: int = 30,
|
|
timeout: int = 300,
|
|
) -> ScenarioRun:
|
|
"""Execute a scenario and extract tool calls from stream-json output."""
|
|
if model not in ALLOWED_MODELS:
|
|
raise ValueError(f"Unknown model: {model!r}. Allowed: {ALLOWED_MODELS}")
|
|
|
|
sandbox_dir = _safe_sandbox_dir(scenario.id)
|
|
_setup_sandbox(sandbox_dir, scenario)
|
|
|
|
result = subprocess.run(
|
|
[
|
|
"claude", "-p", scenario.prompt,
|
|
"--model", model,
|
|
"--max-turns", str(max_turns),
|
|
"--add-dir", str(sandbox_dir),
|
|
"--allowedTools", "Read,Write,Edit,Bash,Glob,Grep",
|
|
"--output-format", "stream-json",
|
|
"--verbose",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
cwd=sandbox_dir,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
raise RuntimeError(
|
|
f"claude -p failed (rc={result.returncode}): {result.stderr[:500]}"
|
|
)
|
|
|
|
observations = _parse_stream_json(result.stdout)
|
|
|
|
return ScenarioRun(
|
|
scenario=scenario,
|
|
observations=tuple(observations),
|
|
sandbox_dir=sandbox_dir,
|
|
)
|
|
|
|
|
|
def _safe_sandbox_dir(scenario_id: str) -> Path:
|
|
"""Sanitize scenario ID and ensure path stays within sandbox base."""
|
|
safe_id = re.sub(r"[^a-zA-Z0-9\-_]", "_", scenario_id)
|
|
path = SANDBOX_BASE / safe_id
|
|
# Validate path stays within sandbox base (raises ValueError on traversal)
|
|
path.resolve().relative_to(SANDBOX_BASE.resolve())
|
|
return path
|
|
|
|
|
|
def _setup_sandbox(sandbox_dir: Path, scenario: Scenario) -> None:
|
|
"""Create sandbox directory and run setup commands."""
|
|
if sandbox_dir.exists():
|
|
shutil.rmtree(sandbox_dir)
|
|
sandbox_dir.mkdir(parents=True)
|
|
|
|
subprocess.run(["git", "init"], cwd=sandbox_dir, capture_output=True)
|
|
|
|
for cmd in scenario.setup_commands:
|
|
parts = shlex.split(cmd)
|
|
subprocess.run(parts, cwd=sandbox_dir, capture_output=True)
|
|
|
|
|
|
def _parse_stream_json(stdout: str) -> list[ObservationEvent]:
|
|
"""Parse claude -p stream-json output into ObservationEvents.
|
|
|
|
Stream-json format:
|
|
- type=assistant with content[].type=tool_use → tool call (name, input)
|
|
- type=user with content[].type=tool_result → tool result (output)
|
|
"""
|
|
events: list[ObservationEvent] = []
|
|
pending: dict[str, dict] = {}
|
|
event_counter = 0
|
|
|
|
for line in stdout.strip().splitlines():
|
|
try:
|
|
msg = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
msg_type = msg.get("type")
|
|
|
|
if msg_type == "assistant":
|
|
content = msg.get("message", {}).get("content", [])
|
|
for block in content:
|
|
if block.get("type") == "tool_use":
|
|
tool_use_id = block.get("id", "")
|
|
tool_input = block.get("input", {})
|
|
input_str = (
|
|
json.dumps(tool_input)[:5000]
|
|
if isinstance(tool_input, dict)
|
|
else str(tool_input)[:5000]
|
|
)
|
|
pending[tool_use_id] = {
|
|
"tool": block.get("name", "unknown"),
|
|
"input": input_str,
|
|
"order": event_counter,
|
|
}
|
|
event_counter += 1
|
|
|
|
elif msg_type == "user":
|
|
content = msg.get("message", {}).get("content", [])
|
|
if isinstance(content, list):
|
|
for block in content:
|
|
tool_use_id = block.get("tool_use_id", "")
|
|
if tool_use_id in pending:
|
|
info = pending.pop(tool_use_id)
|
|
output_content = block.get("content", "")
|
|
if isinstance(output_content, list):
|
|
output_str = json.dumps(output_content)[:5000]
|
|
else:
|
|
output_str = str(output_content)[:5000]
|
|
|
|
events.append(ObservationEvent(
|
|
timestamp=f"T{info['order']:04d}",
|
|
event="tool_complete",
|
|
tool=info["tool"],
|
|
session=msg.get("session_id", "unknown"),
|
|
input=info["input"],
|
|
output=output_str,
|
|
))
|
|
|
|
for _tool_use_id, info in pending.items():
|
|
events.append(ObservationEvent(
|
|
timestamp=f"T{info['order']:04d}",
|
|
event="tool_complete",
|
|
tool=info["tool"],
|
|
session="unknown",
|
|
input=info["input"],
|
|
output="",
|
|
))
|
|
|
|
return sorted(events, key=lambda e: e.timestamp)
|