feat(skills): add skill-comply — automated behavioral compliance measurement (#724)

* feat(skills): add skill-comply — automated behavioral compliance measurement

Automated compliance measurement for skills, rules, and agent definitions.
Generates behavioral specs, runs scenarios at 3 strictness levels,
classifies tool calls via LLM, and produces self-contained reports.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(skill-comply): address bot review feedback

- AGENTS.md: fix stale skill count (115 → 117) in project structure
- run.py: replace remaining print() with logger, add zero-division guard,
  create parent dirs for --output path
- runner.py: add returncode check for claude subprocess, clarify
  relative_to path traversal validation
- parser.py: use is_file() instead of exists(), catch KeyError for
  missing trace fields, add file check in parse_spec
- classifier.py: log warnings on malformed classification output,
  guard against non-dict JSON responses
- grader.py: filter negative indices from LLM classification

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Shimo
2026-03-23 13:51:49 +09:00
committed by GitHub
parent 0f22cb4450
commit a2e465c74d
23 changed files with 1418 additions and 4 deletions

View File

View File

@@ -0,0 +1,85 @@
"""Classify tool calls against compliance steps using LLM."""
from __future__ import annotations
import json
import logging
import subprocess
from pathlib import Path
logger = logging.getLogger(__name__)
from scripts.parser import ComplianceSpec, ObservationEvent
PROMPTS_DIR = Path(__file__).parent.parent / "prompts"
def classify_events(
spec: ComplianceSpec,
trace: list[ObservationEvent],
model: str = "haiku",
) -> dict[str, list[int]]:
"""Classify which tool calls match which compliance steps.
Returns {step_id: [event_indices]} via a single LLM call.
"""
if not trace:
return {}
steps_desc = "\n".join(
f"- {step.id}: {step.detector.description}"
for step in spec.steps
)
tool_calls = "\n".join(
f"[{i}] {event.tool}: input={event.input[:500]} output={event.output[:200]}"
for i, event in enumerate(trace)
)
prompt_template = (PROMPTS_DIR / "classifier.md").read_text()
prompt = (
prompt_template
.replace("{steps_description}", steps_desc)
.replace("{tool_calls}", tool_calls)
)
result = subprocess.run(
["claude", "-p", prompt, "--model", model, "--output-format", "text"],
capture_output=True,
text=True,
timeout=60,
)
if result.returncode != 0:
raise RuntimeError(
f"classifier subprocess failed (rc={result.returncode}): "
f"{result.stderr[:500]}"
)
return _parse_classification(result.stdout)
def _parse_classification(text: str) -> dict[str, list[int]]:
"""Parse LLM classification output into {step_id: [event_indices]}."""
text = text.strip()
# Strip markdown fences
lines = text.splitlines()
if lines and lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].startswith("```"):
lines = lines[:-1]
cleaned = "\n".join(lines)
try:
parsed = json.loads(cleaned)
if not isinstance(parsed, dict):
logger.warning("Classifier returned non-dict JSON: %s", type(parsed).__name__)
return {}
return {
k: [int(i) for i in v]
for k, v in parsed.items()
if isinstance(v, list)
}
except (json.JSONDecodeError, ValueError, TypeError) as e:
logger.warning("Failed to parse classification output: %s", e)
return {}

View File

@@ -0,0 +1,122 @@
"""Grade observation traces against compliance specs using LLM classification."""
from __future__ import annotations
from dataclasses import dataclass
from scripts.classifier import classify_events
from scripts.parser import ComplianceSpec, ObservationEvent, Step
@dataclass(frozen=True)
class StepResult:
step_id: str
detected: bool
evidence: tuple[ObservationEvent, ...]
failure_reason: str | None
@dataclass(frozen=True)
class ComplianceResult:
spec_id: str
steps: tuple[StepResult, ...]
compliance_rate: float
recommend_hook_promotion: bool
classification: dict[str, list[int]]
def _check_temporal_order(
step: Step,
event: ObservationEvent,
resolved: dict[str, list[ObservationEvent]],
classified: dict[str, list[ObservationEvent]],
) -> str | None:
"""Check before_step/after_step constraints. Returns failure reason or None."""
if step.detector.after_step is not None:
after_events = resolved.get(step.detector.after_step, [])
if not after_events:
return f"after_step '{step.detector.after_step}' not yet detected"
latest_after = max(e.timestamp for e in after_events)
if event.timestamp <= latest_after:
return (
f"must occur after '{step.detector.after_step}' "
f"(last at {latest_after}), but found at {event.timestamp}"
)
if step.detector.before_step is not None:
# Look ahead using LLM classification results
before_events = resolved.get(step.detector.before_step)
if before_events is None:
before_events = classified.get(step.detector.before_step, [])
if before_events:
earliest_before = min(e.timestamp for e in before_events)
if event.timestamp >= earliest_before:
return (
f"must occur before '{step.detector.before_step}' "
f"(first at {earliest_before}), but found at {event.timestamp}"
)
return None
def grade(
spec: ComplianceSpec,
trace: list[ObservationEvent],
classifier_model: str = "haiku",
) -> ComplianceResult:
"""Grade a trace against a compliance spec using LLM classification."""
sorted_trace = sorted(trace, key=lambda e: e.timestamp)
# Step 1: LLM classifies all events in one batch call
classification = classify_events(spec, sorted_trace, model=classifier_model)
# Convert indices to events
classified: dict[str, list[ObservationEvent]] = {
step_id: [sorted_trace[i] for i in indices if 0 <= i < len(sorted_trace)]
for step_id, indices in classification.items()
}
# Step 2: Check temporal ordering (deterministic)
resolved: dict[str, list[ObservationEvent]] = {}
step_results: list[StepResult] = []
for step in spec.steps:
candidates = classified.get(step.id, [])
matched: list[ObservationEvent] = []
failure_reason: str | None = None
for event in candidates:
temporal_fail = _check_temporal_order(step, event, resolved, classified)
if temporal_fail is None:
matched.append(event)
break
else:
failure_reason = temporal_fail
detected = len(matched) > 0
if detected:
resolved[step.id] = matched
elif failure_reason is None:
failure_reason = f"no matching event classified for step '{step.id}'"
step_results.append(StepResult(
step_id=step.id,
detected=detected,
evidence=tuple(matched),
failure_reason=failure_reason if not detected else None,
))
required_ids = {s.id for s in spec.steps if s.required}
required_steps = [s for s in step_results if s.step_id in required_ids]
detected_required = sum(1 for s in required_steps if s.detected)
total_required = len(required_steps)
compliance_rate = detected_required / total_required if total_required > 0 else 0.0
return ComplianceResult(
spec_id=spec.id,
steps=tuple(step_results),
compliance_rate=compliance_rate,
recommend_hook_promotion=compliance_rate < spec.threshold_promote_to_hook,
classification=classification,
)

View File

@@ -0,0 +1,107 @@
"""Parse observation traces (JSONL) and compliance specs (YAML)."""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
import yaml
@dataclass(frozen=True)
class ObservationEvent:
timestamp: str
event: str
tool: str
session: str
input: str
output: str
@dataclass(frozen=True)
class Detector:
description: str
after_step: str | None = None
before_step: str | None = None
@dataclass(frozen=True)
class Step:
id: str
description: str
required: bool
detector: Detector
@dataclass(frozen=True)
class ComplianceSpec:
id: str
name: str
source_rule: str
version: str
steps: tuple[Step, ...]
threshold_promote_to_hook: float
def parse_trace(path: Path) -> list[ObservationEvent]:
"""Parse a JSONL observation trace file into sorted events."""
if not path.is_file():
raise FileNotFoundError(f"Trace file not found: {path}")
text = path.read_text().strip()
if not text:
return []
events: list[ObservationEvent] = []
for i, line in enumerate(text.splitlines(), 1):
try:
raw = json.loads(line)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON at line {i}: {e}") from e
try:
events.append(ObservationEvent(
timestamp=raw["timestamp"],
event=raw["event"],
tool=raw["tool"],
session=raw["session"],
input=raw.get("input", ""),
output=raw.get("output", ""),
))
except KeyError as e:
raise ValueError(f"Missing required field {e} at line {i}") from e
return sorted(events, key=lambda e: e.timestamp)
def parse_spec(path: Path) -> ComplianceSpec:
"""Parse a YAML compliance spec file."""
if not path.is_file():
raise FileNotFoundError(f"Spec file not found: {path}")
raw = yaml.safe_load(path.read_text())
steps: list[Step] = []
for s in raw["steps"]:
d = s["detector"]
steps.append(Step(
id=s["id"],
description=s["description"],
required=s["required"],
detector=Detector(
description=d["description"],
after_step=d.get("after_step"),
before_step=d.get("before_step"),
),
))
if "scoring" not in raw:
raise KeyError("Missing 'scoring' section in compliance spec")
return ComplianceSpec(
id=raw["id"],
name=raw["name"],
source_rule=raw["source_rule"],
version=raw["version"],
steps=tuple(steps),
threshold_promote_to_hook=raw["scoring"]["threshold_promote_to_hook"],
)

View File

@@ -0,0 +1,170 @@
"""Generate Markdown compliance reports."""
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from scripts.grader import ComplianceResult
from scripts.parser import ComplianceSpec, ObservationEvent
from scripts.scenario_generator import Scenario
def generate_report(
skill_path: Path,
spec: ComplianceSpec,
results: list[tuple[str, ComplianceResult, list[ObservationEvent]]],
scenarios: list[Scenario] | None = None,
) -> str:
"""Generate a Markdown compliance report.
Args:
skill_path: Path to the skill file that was tested.
spec: The compliance spec used for grading.
results: List of (scenario_level_name, ComplianceResult, observations) tuples.
scenarios: Original scenario definitions with prompts.
"""
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
overall = _overall_compliance(results)
threshold = spec.threshold_promote_to_hook
lines: list[str] = []
lines.append(f"# skill-comply Report: {skill_path.name}")
lines.append(f"Generated: {now}")
lines.append("")
# Summary
lines.append("## Summary")
lines.append("")
lines.append(f"| Metric | Value |")
lines.append(f"|--------|-------|")
lines.append(f"| Skill | `{skill_path}` |")
lines.append(f"| Spec | {spec.id} |")
lines.append(f"| Scenarios | {len(results)} |")
lines.append(f"| Overall Compliance | {overall:.0%} |")
lines.append(f"| Threshold | {threshold:.0%} |")
promote_steps = _steps_to_promote(spec, results, threshold)
if promote_steps:
step_names = ", ".join(promote_steps)
lines.append(f"| Recommendation | **Promote {step_names} to hooks** |")
else:
lines.append(f"| Recommendation | All steps above threshold — no hook promotion needed |")
lines.append("")
# Expected Behavioral Sequence
lines.append("## Expected Behavioral Sequence")
lines.append("")
lines.append("| # | Step | Required | Description |")
lines.append("|---|------|----------|-------------|")
for i, step in enumerate(spec.steps, 1):
req = "Yes" if step.required else "No"
lines.append(f"| {i} | {step.id} | {req} | {step.detector.description} |")
lines.append("")
# Scenario Results
lines.append("## Scenario Results")
lines.append("")
lines.append("| Scenario | Compliance | Failed Steps |")
lines.append("|----------|-----------|----------------|")
for level_name, result, _obs in results:
failed = [s.step_id for s in result.steps if not s.detected
and any(sp.id == s.step_id and sp.required for sp in spec.steps)]
failed_str = ", ".join(failed) if failed else ""
lines.append(f"| {level_name} | {result.compliance_rate:.0%} | {failed_str} |")
lines.append("")
# Scenario Prompts
if scenarios:
lines.append("## Scenario Prompts")
lines.append("")
for s in scenarios:
lines.append(f"### {s.level_name} (Level {s.level})")
lines.append("")
for prompt_line in s.prompt.splitlines():
lines.append(f"> {prompt_line}")
lines.append("")
# Hook Promotion Recommendations (optional/advanced)
if promote_steps:
lines.append("## Advanced: Hook Promotion Recommendations (optional)")
lines.append("")
for step_id in promote_steps:
rate = _step_compliance_rate(step_id, results)
step = next(s for s in spec.steps if s.id == step_id)
lines.append(
f"- **{step_id}** (compliance {rate:.0%}): {step.description}"
)
lines.append("")
# Per-scenario details with timeline
lines.append("## Detail")
lines.append("")
for level_name, result, observations in results:
lines.append(f"### {level_name} (Compliance: {result.compliance_rate:.0%})")
lines.append("")
lines.append("| Step | Required | Detected | Reason |")
lines.append("|------|----------|----------|--------|")
for sr in result.steps:
req = "Yes" if any(
sp.id == sr.step_id and sp.required for sp in spec.steps
) else "No"
det = "YES" if sr.detected else "NO"
reason = sr.failure_reason or ""
lines.append(f"| {sr.step_id} | {req} | {det} | {reason} |")
lines.append("")
# Timeline: show what the agent actually did
if observations:
# Build reverse index: event_index → step_id
index_to_step: dict[int, str] = {}
for step_id, indices in result.classification.items():
for idx in indices:
index_to_step[idx] = step_id
lines.append(f"**Tool Call Timeline ({len(observations)} calls)**")
lines.append("")
lines.append("| # | Tool | Input | Output | Classified As |")
lines.append("|---|------|-------|--------|------|")
for i, obs in enumerate(observations):
step_label = index_to_step.get(i, "")
input_summary = obs.input[:100].replace("|", "\\|").replace("\n", " ")
output_summary = obs.output[:50].replace("|", "\\|").replace("\n", " ")
lines.append(
f"| {i} | {obs.tool} | {input_summary} | {output_summary} | {step_label} |"
)
lines.append("")
return "\n".join(lines)
def _overall_compliance(results: list[tuple[str, ComplianceResult, list[ObservationEvent]]]) -> float:
if not results:
return 0.0
return sum(r.compliance_rate for _, r, _obs in results) / len(results)
def _step_compliance_rate(
step_id: str,
results: list[tuple[str, ComplianceResult, list[ObservationEvent]]],
) -> float:
detected = sum(
1 for _, r, _obs in results
for s in r.steps if s.step_id == step_id and s.detected
)
return detected / len(results) if results else 0.0
def _steps_to_promote(
spec: ComplianceSpec,
results: list[tuple[str, ComplianceResult, list[ObservationEvent]]],
threshold: float,
) -> list[str]:
promote = []
for step in spec.steps:
if not step.required:
continue
rate = _step_compliance_rate(step.id, results)
if rate < threshold:
promote.append(step.id)
return promote

View File

@@ -0,0 +1,127 @@
"""CLI entry point for skill-comply."""
from __future__ import annotations
import argparse
import logging
import sys
from pathlib import Path
from typing import Any
import yaml
from scripts.grader import grade
from scripts.report import generate_report
from scripts.runner import run_scenario
from scripts.scenario_generator import generate_scenarios
from scripts.spec_generator import generate_spec
logger = logging.getLogger(__name__)
def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(message)s")
parser = argparse.ArgumentParser(
description="skill-comply: Measure skill compliance rates",
)
parser.add_argument(
"skill",
type=Path,
help="Path to skill/rule file to test",
)
parser.add_argument(
"--model",
default="sonnet",
help="Model for scenario execution (default: sonnet)",
)
parser.add_argument(
"--gen-model",
default="haiku",
help="Model for spec/scenario generation (default: haiku)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Generate spec and scenarios without executing",
)
parser.add_argument(
"--output",
type=Path,
default=None,
help="Output report path (default: results/<skill-name>.md)",
)
args = parser.parse_args()
if not args.skill.is_file():
logger.error("Error: Skill file not found: %s", args.skill)
sys.exit(1)
results_dir = Path(__file__).parent.parent / "results"
results_dir.mkdir(exist_ok=True)
# Step 1: Generate compliance spec
logger.info("[1/4] Generating compliance spec from %s...", args.skill.name)
spec = generate_spec(args.skill, model=args.gen_model)
logger.info(" %d steps extracted", len(spec.steps))
# Step 2: Generate scenarios
spec_yaml = yaml.dump({
"steps": [
{"id": s.id, "description": s.description, "required": s.required}
for s in spec.steps
]
})
logger.info("[2/4] Generating scenarios (3 prompt strictness levels)...")
scenarios = generate_scenarios(args.skill, spec_yaml, model=args.gen_model)
logger.info(" %d scenarios generated", len(scenarios))
for s in scenarios:
logger.info(" - %s: %s", s.level_name, s.description[:60])
if args.dry_run:
logger.info("\n[dry-run] Spec and scenarios generated. Skipping execution.")
logger.info("\nSpec: %s (%d steps)", spec.id, len(spec.steps))
for step in spec.steps:
marker = "*" if step.required else " "
logger.info(" [%s] %s: %s", marker, step.id, step.description)
return
# Step 3: Execute scenarios
logger.info("[3/4] Executing scenarios (model=%s)...", args.model)
graded_results: list[tuple[str, Any, list[Any]]] = []
for scenario in scenarios:
logger.info(" Running %s...", scenario.level_name)
run = run_scenario(scenario, model=args.model)
result = grade(spec, list(run.observations))
graded_results.append((scenario.level_name, result, list(run.observations)))
logger.info(" %s: %.0f%%", scenario.level_name, result.compliance_rate * 100)
# Step 4: Generate report
skill_name = args.skill.parent.name if args.skill.stem == "SKILL" else args.skill.stem
output_path = args.output or results_dir / f"{skill_name}.md"
logger.info("[4/4] Generating report...")
report = generate_report(args.skill, spec, graded_results, scenarios=scenarios)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report)
logger.info(" Report saved to %s", output_path)
# Summary
if not graded_results:
logger.warning("No scenarios were executed.")
return
overall = sum(r.compliance_rate for _, r, _obs in graded_results) / len(graded_results)
logger.info("\n%s", "=" * 50)
logger.info("Overall Compliance: %.0f%%", overall * 100)
if overall < spec.threshold_promote_to_hook:
logger.info(
"Recommendation: Some steps have low compliance. "
"Consider promoting them to hooks. See the report for details."
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,161 @@
"""Run scenarios via claude -p and parse tool calls from stream-json output."""
from __future__ import annotations
import json
import re
import shlex
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from scripts.parser import ObservationEvent
from scripts.scenario_generator import Scenario
SANDBOX_BASE = Path("/tmp/skill-comply-sandbox")
ALLOWED_MODELS = frozenset({"haiku", "sonnet", "opus"})
@dataclass(frozen=True)
class ScenarioRun:
scenario: Scenario
observations: tuple[ObservationEvent, ...]
sandbox_dir: Path
def run_scenario(
scenario: Scenario,
model: str = "sonnet",
max_turns: int = 30,
timeout: int = 300,
) -> ScenarioRun:
"""Execute a scenario and extract tool calls from stream-json output."""
if model not in ALLOWED_MODELS:
raise ValueError(f"Unknown model: {model!r}. Allowed: {ALLOWED_MODELS}")
sandbox_dir = _safe_sandbox_dir(scenario.id)
_setup_sandbox(sandbox_dir, scenario)
result = subprocess.run(
[
"claude", "-p", scenario.prompt,
"--model", model,
"--max-turns", str(max_turns),
"--add-dir", str(sandbox_dir),
"--allowedTools", "Read,Write,Edit,Bash,Glob,Grep",
"--output-format", "stream-json",
"--verbose",
],
capture_output=True,
text=True,
timeout=timeout,
cwd=sandbox_dir,
)
if result.returncode != 0:
raise RuntimeError(
f"claude -p failed (rc={result.returncode}): {result.stderr[:500]}"
)
observations = _parse_stream_json(result.stdout)
return ScenarioRun(
scenario=scenario,
observations=tuple(observations),
sandbox_dir=sandbox_dir,
)
def _safe_sandbox_dir(scenario_id: str) -> Path:
"""Sanitize scenario ID and ensure path stays within sandbox base."""
safe_id = re.sub(r"[^a-zA-Z0-9\-_]", "_", scenario_id)
path = SANDBOX_BASE / safe_id
# Validate path stays within sandbox base (raises ValueError on traversal)
path.resolve().relative_to(SANDBOX_BASE.resolve())
return path
def _setup_sandbox(sandbox_dir: Path, scenario: Scenario) -> None:
"""Create sandbox directory and run setup commands."""
if sandbox_dir.exists():
shutil.rmtree(sandbox_dir)
sandbox_dir.mkdir(parents=True)
subprocess.run(["git", "init"], cwd=sandbox_dir, capture_output=True)
for cmd in scenario.setup_commands:
parts = shlex.split(cmd)
subprocess.run(parts, cwd=sandbox_dir, capture_output=True)
def _parse_stream_json(stdout: str) -> list[ObservationEvent]:
"""Parse claude -p stream-json output into ObservationEvents.
Stream-json format:
- type=assistant with content[].type=tool_use → tool call (name, input)
- type=user with content[].type=tool_result → tool result (output)
"""
events: list[ObservationEvent] = []
pending: dict[str, dict] = {}
event_counter = 0
for line in stdout.strip().splitlines():
try:
msg = json.loads(line)
except json.JSONDecodeError:
continue
msg_type = msg.get("type")
if msg_type == "assistant":
content = msg.get("message", {}).get("content", [])
for block in content:
if block.get("type") == "tool_use":
tool_use_id = block.get("id", "")
tool_input = block.get("input", {})
input_str = (
json.dumps(tool_input)[:5000]
if isinstance(tool_input, dict)
else str(tool_input)[:5000]
)
pending[tool_use_id] = {
"tool": block.get("name", "unknown"),
"input": input_str,
"order": event_counter,
}
event_counter += 1
elif msg_type == "user":
content = msg.get("message", {}).get("content", [])
if isinstance(content, list):
for block in content:
tool_use_id = block.get("tool_use_id", "")
if tool_use_id in pending:
info = pending.pop(tool_use_id)
output_content = block.get("content", "")
if isinstance(output_content, list):
output_str = json.dumps(output_content)[:5000]
else:
output_str = str(output_content)[:5000]
events.append(ObservationEvent(
timestamp=f"T{info['order']:04d}",
event="tool_complete",
tool=info["tool"],
session=msg.get("session_id", "unknown"),
input=info["input"],
output=output_str,
))
for _tool_use_id, info in pending.items():
events.append(ObservationEvent(
timestamp=f"T{info['order']:04d}",
event="tool_complete",
tool=info["tool"],
session="unknown",
input=info["input"],
output="",
))
return sorted(events, key=lambda e: e.timestamp)

View File

@@ -0,0 +1,70 @@
"""Generate pressure scenarios from skill + spec using LLM."""
from __future__ import annotations
import subprocess
from dataclasses import dataclass
from pathlib import Path
import yaml
from scripts.utils import extract_yaml
PROMPTS_DIR = Path(__file__).parent.parent / "prompts"
@dataclass(frozen=True)
class Scenario:
id: str
level: int
level_name: str
description: str
prompt: str
setup_commands: tuple[str, ...]
def generate_scenarios(
skill_path: Path,
spec_yaml: str,
model: str = "haiku",
) -> list[Scenario]:
"""Generate 3 scenarios with decreasing prompt strictness.
Calls claude -p with the scenario_generator prompt, parses YAML output.
"""
skill_content = skill_path.read_text()
prompt_template = (PROMPTS_DIR / "scenario_generator.md").read_text()
prompt = (
prompt_template
.replace("{skill_content}", skill_content)
.replace("{spec_yaml}", spec_yaml)
)
result = subprocess.run(
["claude", "-p", prompt, "--model", model, "--output-format", "text"],
capture_output=True,
text=True,
timeout=120,
)
if result.returncode != 0:
raise RuntimeError(f"claude -p failed: {result.stderr}")
if not result.stdout.strip():
raise RuntimeError("claude -p returned empty output")
raw_yaml = extract_yaml(result.stdout)
parsed = yaml.safe_load(raw_yaml)
scenarios: list[Scenario] = []
for s in parsed["scenarios"]:
scenarios.append(Scenario(
id=s["id"],
level=s["level"],
level_name=s["level_name"],
description=s["description"],
prompt=s["prompt"].strip(),
setup_commands=tuple(s.get("setup_commands", [])),
))
return sorted(scenarios, key=lambda s: s.level)

View File

@@ -0,0 +1,72 @@
"""Generate compliance specs from skill files using LLM."""
from __future__ import annotations
import subprocess
import tempfile
from pathlib import Path
import yaml
from scripts.parser import ComplianceSpec, parse_spec
from scripts.utils import extract_yaml
PROMPTS_DIR = Path(__file__).parent.parent / "prompts"
def generate_spec(
skill_path: Path,
model: str = "haiku",
max_retries: int = 2,
) -> ComplianceSpec:
"""Generate a compliance spec from a skill/rule file.
Calls claude -p with the spec_generator prompt, parses YAML output.
Retries on YAML parse errors with error feedback.
"""
skill_content = skill_path.read_text()
prompt_template = (PROMPTS_DIR / "spec_generator.md").read_text()
base_prompt = prompt_template.replace("{skill_content}", skill_content)
last_error: Exception | None = None
for attempt in range(max_retries + 1):
prompt = base_prompt
if attempt > 0 and last_error is not None:
prompt += (
f"\n\nPREVIOUS ATTEMPT FAILED with YAML parse error:\n"
f"{last_error}\n\n"
f"Please fix the YAML. Remember to quote all string values "
f"that contain colons, e.g.: description: \"Use type: description format\""
)
result = subprocess.run(
["claude", "-p", prompt, "--model", model, "--output-format", "text"],
capture_output=True,
text=True,
timeout=120,
)
if result.returncode != 0:
raise RuntimeError(f"claude -p failed: {result.stderr}")
raw_yaml = extract_yaml(result.stdout)
tmp_path = None
with tempfile.NamedTemporaryFile(
mode="w", suffix=".yaml", delete=False,
) as f:
f.write(raw_yaml)
tmp_path = Path(f.name)
try:
return parse_spec(tmp_path)
except (yaml.YAMLError, KeyError, TypeError) as e:
last_error = e
if attempt == max_retries:
raise
finally:
if tmp_path is not None:
tmp_path.unlink(missing_ok=True)
raise RuntimeError("unreachable")

View File

@@ -0,0 +1,13 @@
"""Shared utilities for skill-comply scripts."""
from __future__ import annotations
def extract_yaml(text: str) -> str:
"""Extract YAML from LLM output, stripping markdown fences if present."""
lines = text.strip().splitlines()
if lines and lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].startswith("```"):
lines = lines[:-1]
return "\n".join(lines)