Files
Shimo a2e465c74d feat(skills): add skill-comply — automated behavioral compliance measurement (#724)
* feat(skills): add skill-comply — automated behavioral compliance measurement

Automated compliance measurement for skills, rules, and agent definitions.
Generates behavioral specs, runs scenarios at 3 strictness levels,
classifies tool calls via LLM, and produces self-contained reports.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(skill-comply): address bot review feedback

- AGENTS.md: fix stale skill count (115 → 117) in project structure
- run.py: replace remaining print() with logger, add zero-division guard,
  create parent dirs for --output path
- runner.py: add returncode check for claude subprocess, clarify
  relative_to path traversal validation
- parser.py: use is_file() instead of exists(), catch KeyError for
  missing trace fields, add file check in parse_spec
- classifier.py: log warnings on malformed classification output,
  guard against non-dict JSON responses
- grader.py: filter negative indices from LLM classification

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 21:51:49 -07:00

123 lines
4.2 KiB
Python

"""Grade observation traces against compliance specs using LLM classification."""
from __future__ import annotations
from dataclasses import dataclass
from scripts.classifier import classify_events
from scripts.parser import ComplianceSpec, ObservationEvent, Step
@dataclass(frozen=True)
class StepResult:
step_id: str
detected: bool
evidence: tuple[ObservationEvent, ...]
failure_reason: str | None
@dataclass(frozen=True)
class ComplianceResult:
spec_id: str
steps: tuple[StepResult, ...]
compliance_rate: float
recommend_hook_promotion: bool
classification: dict[str, list[int]]
def _check_temporal_order(
step: Step,
event: ObservationEvent,
resolved: dict[str, list[ObservationEvent]],
classified: dict[str, list[ObservationEvent]],
) -> str | None:
"""Check before_step/after_step constraints. Returns failure reason or None."""
if step.detector.after_step is not None:
after_events = resolved.get(step.detector.after_step, [])
if not after_events:
return f"after_step '{step.detector.after_step}' not yet detected"
latest_after = max(e.timestamp for e in after_events)
if event.timestamp <= latest_after:
return (
f"must occur after '{step.detector.after_step}' "
f"(last at {latest_after}), but found at {event.timestamp}"
)
if step.detector.before_step is not None:
# Look ahead using LLM classification results
before_events = resolved.get(step.detector.before_step)
if before_events is None:
before_events = classified.get(step.detector.before_step, [])
if before_events:
earliest_before = min(e.timestamp for e in before_events)
if event.timestamp >= earliest_before:
return (
f"must occur before '{step.detector.before_step}' "
f"(first at {earliest_before}), but found at {event.timestamp}"
)
return None
def grade(
spec: ComplianceSpec,
trace: list[ObservationEvent],
classifier_model: str = "haiku",
) -> ComplianceResult:
"""Grade a trace against a compliance spec using LLM classification."""
sorted_trace = sorted(trace, key=lambda e: e.timestamp)
# Step 1: LLM classifies all events in one batch call
classification = classify_events(spec, sorted_trace, model=classifier_model)
# Convert indices to events
classified: dict[str, list[ObservationEvent]] = {
step_id: [sorted_trace[i] for i in indices if 0 <= i < len(sorted_trace)]
for step_id, indices in classification.items()
}
# Step 2: Check temporal ordering (deterministic)
resolved: dict[str, list[ObservationEvent]] = {}
step_results: list[StepResult] = []
for step in spec.steps:
candidates = classified.get(step.id, [])
matched: list[ObservationEvent] = []
failure_reason: str | None = None
for event in candidates:
temporal_fail = _check_temporal_order(step, event, resolved, classified)
if temporal_fail is None:
matched.append(event)
break
else:
failure_reason = temporal_fail
detected = len(matched) > 0
if detected:
resolved[step.id] = matched
elif failure_reason is None:
failure_reason = f"no matching event classified for step '{step.id}'"
step_results.append(StepResult(
step_id=step.id,
detected=detected,
evidence=tuple(matched),
failure_reason=failure_reason if not detected else None,
))
required_ids = {s.id for s in spec.steps if s.required}
required_steps = [s for s in step_results if s.step_id in required_ids]
detected_required = sum(1 for s in required_steps if s.detected)
total_required = len(required_steps)
compliance_rate = detected_required / total_required if total_required > 0 else 0.0
return ComplianceResult(
spec_id=spec.id,
steps=tuple(step_results),
compliance_rate=compliance_rate,
recommend_hook_promotion=compliance_rate < spec.threshold_promote_to_hook,
classification=classification,
)