From a2e465c74dfb1887eb52a0ba3f4fe894bec96edb Mon Sep 17 00:00:00 2001 From: Shimo <54734315+shimo4228@users.noreply.github.com> Date: Mon, 23 Mar 2026 13:51:49 +0900 Subject: [PATCH] =?UTF-8?q?feat(skills):=20add=20skill-comply=20=E2=80=94?= =?UTF-8?q?=20automated=20behavioral=20compliance=20measurement=20(#724)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(skills): add skill-comply — automated behavioral compliance measurement Automated compliance measurement for skills, rules, and agent definitions. Generates behavioral specs, runs scenarios at 3 strictness levels, classifies tool calls via LLM, and produces self-contained reports. Co-Authored-By: Claude Opus 4.6 (1M context) * fix(skill-comply): address bot review feedback - AGENTS.md: fix stale skill count (115 → 117) in project structure - run.py: replace remaining print() with logger, add zero-division guard, create parent dirs for --output path - runner.py: add returncode check for claude subprocess, clarify relative_to path traversal validation - parser.py: use is_file() instead of exists(), catch KeyError for missing trace fields, add file check in parse_spec - classifier.py: log warnings on malformed classification output, guard against non-dict JSON responses - grader.py: filter negative indices from LLM classification Co-Authored-By: Claude Opus 4.6 (1M context) --------- Co-authored-by: Claude Opus 4.6 (1M context) --- AGENTS.md | 4 +- README.md | 4 +- skills/skill-comply/.gitignore | 7 + skills/skill-comply/SKILL.md | 58 ++++++ .../fixtures/compliant_trace.jsonl | 5 + .../fixtures/noncompliant_trace.jsonl | 3 + skills/skill-comply/fixtures/tdd_spec.yaml | 44 +++++ skills/skill-comply/prompts/classifier.md | 24 +++ .../prompts/scenario_generator.md | 62 +++++++ skills/skill-comply/prompts/spec_generator.md | 42 +++++ skills/skill-comply/pyproject.toml | 15 ++ skills/skill-comply/scripts/__init__.py | 0 skills/skill-comply/scripts/classifier.py | 85 +++++++++ skills/skill-comply/scripts/grader.py | 122 +++++++++++++ skills/skill-comply/scripts/parser.py | 107 +++++++++++ skills/skill-comply/scripts/report.py | 170 ++++++++++++++++++ skills/skill-comply/scripts/run.py | 127 +++++++++++++ skills/skill-comply/scripts/runner.py | 161 +++++++++++++++++ .../scripts/scenario_generator.py | 70 ++++++++ skills/skill-comply/scripts/spec_generator.py | 72 ++++++++ skills/skill-comply/scripts/utils.py | 13 ++ skills/skill-comply/tests/test_grader.py | 137 ++++++++++++++ skills/skill-comply/tests/test_parser.py | 90 ++++++++++ 23 files changed, 1418 insertions(+), 4 deletions(-) create mode 100644 skills/skill-comply/.gitignore create mode 100644 skills/skill-comply/SKILL.md create mode 100644 skills/skill-comply/fixtures/compliant_trace.jsonl create mode 100644 skills/skill-comply/fixtures/noncompliant_trace.jsonl create mode 100644 skills/skill-comply/fixtures/tdd_spec.yaml create mode 100644 skills/skill-comply/prompts/classifier.md create mode 100644 skills/skill-comply/prompts/scenario_generator.md create mode 100644 skills/skill-comply/prompts/spec_generator.md create mode 100644 skills/skill-comply/pyproject.toml create mode 100644 skills/skill-comply/scripts/__init__.py create mode 100644 skills/skill-comply/scripts/classifier.py create mode 100644 skills/skill-comply/scripts/grader.py create mode 100644 skills/skill-comply/scripts/parser.py create mode 100644 skills/skill-comply/scripts/report.py create mode 100644 skills/skill-comply/scripts/run.py create mode 100644 skills/skill-comply/scripts/runner.py create mode 100644 skills/skill-comply/scripts/scenario_generator.py create mode 100644 skills/skill-comply/scripts/spec_generator.py create mode 100644 skills/skill-comply/scripts/utils.py create mode 100644 skills/skill-comply/tests/test_grader.py create mode 100644 skills/skill-comply/tests/test_parser.py diff --git a/AGENTS.md b/AGENTS.md index 646abd37..cfcd3a8f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,6 +1,6 @@ # Everything Claude Code (ECC) — Agent Instructions -This is a **production-ready AI coding plugin** providing 28 specialized agents, 116 skills, 60 commands, and automated hook workflows for software development. +This is a **production-ready AI coding plugin** providing 28 specialized agents, 119 skills, 60 commands, and automated hook workflows for software development. **Version:** 1.9.0 @@ -142,7 +142,7 @@ Troubleshoot failures: check test isolation → verify mocks → fix implementat ``` agents/ — 28 specialized subagents -skills/ — 115 workflow skills and domain knowledge +skills/ — 117 workflow skills and domain knowledge commands/ — 60 slash commands hooks/ — Trigger-based automations rules/ — Always-follow guidelines (common + per-language) diff --git a/README.md b/README.md index 7e367717..b1161a5a 100644 --- a/README.md +++ b/README.md @@ -212,7 +212,7 @@ For manual install instructions see the README in the `rules/` folder. /plugin list everything-claude-code@everything-claude-code ``` -✨ **That's it!** You now have access to 28 agents, 116 skills, and 60 commands. +✨ **That's it!** You now have access to 28 agents, 119 skills, and 60 commands. --- @@ -1085,7 +1085,7 @@ The configuration is automatically detected from `.opencode/opencode.json`. |---------|-------------|----------|--------| | Agents | ✅ 28 agents | ✅ 12 agents | **Claude Code leads** | | Commands | ✅ 60 commands | ✅ 31 commands | **Claude Code leads** | -| Skills | ✅ 116 skills | ✅ 37 skills | **Claude Code leads** | +| Skills | ✅ 119 skills | ✅ 37 skills | **Claude Code leads** | | Hooks | ✅ 8 event types | ✅ 11 events | **OpenCode has more!** | | Rules | ✅ 29 rules | ✅ 13 instructions | **Claude Code leads** | | MCP Servers | ✅ 14 servers | ✅ Full | **Full parity** | diff --git a/skills/skill-comply/.gitignore b/skills/skill-comply/.gitignore new file mode 100644 index 00000000..ae484fb9 --- /dev/null +++ b/skills/skill-comply/.gitignore @@ -0,0 +1,7 @@ +.venv/ +__pycache__/ +*.py[cod] +results/*.md +.pytest_cache/ +.coverage +uv.lock diff --git a/skills/skill-comply/SKILL.md b/skills/skill-comply/SKILL.md new file mode 100644 index 00000000..ea4b4a57 --- /dev/null +++ b/skills/skill-comply/SKILL.md @@ -0,0 +1,58 @@ +--- +name: skill-comply +description: Visualize whether skills, rules, and agent definitions are actually followed — auto-generates scenarios at 3 prompt strictness levels, runs agents, classifies behavioral sequences, and reports compliance rates with full tool call timelines +origin: ECC +tools: Read, Bash +--- + +# skill-comply: Automated Compliance Measurement + +Measures whether coding agents actually follow skills, rules, or agent definitions by: +1. Auto-generating expected behavioral sequences (specs) from any .md file +2. Auto-generating scenarios with decreasing prompt strictness (supportive → neutral → competing) +3. Running `claude -p` and capturing tool call traces via stream-json +4. Classifying tool calls against spec steps using LLM (not regex) +5. Checking temporal ordering deterministically +6. Generating self-contained reports with spec, prompts, and timelines + +## Supported Targets + +- **Skills** (`skills/*/SKILL.md`): Workflow skills like search-first, TDD guides +- **Rules** (`rules/common/*.md`): Mandatory rules like testing.md, security.md, git-workflow.md +- **Agent definitions** (`agents/*.md`): Whether an agent gets invoked when expected (internal workflow verification not yet supported) + +## When to Activate + +- User runs `/skill-comply ` +- User asks "is this rule actually being followed?" +- After adding new rules/skills, to verify agent compliance +- Periodically as part of quality maintenance + +## Usage + +```bash +# Full run +uv run python -m scripts.run ~/.claude/rules/common/testing.md + +# Dry run (no cost, spec + scenarios only) +uv run python -m scripts.run --dry-run ~/.claude/skills/search-first/SKILL.md + +# Custom models +uv run python -m scripts.run --gen-model haiku --model sonnet +``` + +## Key Concept: Prompt Independence + +Measures whether a skill/rule is followed even when the prompt doesn't explicitly support it. + +## Report Contents + +Reports are self-contained and include: +1. Expected behavioral sequence (auto-generated spec) +2. Scenario prompts (what was asked at each strictness level) +3. Compliance scores per scenario +4. Tool call timelines with LLM classification labels + +### Advanced (optional) + +For users familiar with hooks, reports also include hook promotion recommendations for steps with low compliance. This is informational — the main value is the compliance visibility itself. diff --git a/skills/skill-comply/fixtures/compliant_trace.jsonl b/skills/skill-comply/fixtures/compliant_trace.jsonl new file mode 100644 index 00000000..6e315f1b --- /dev/null +++ b/skills/skill-comply/fixtures/compliant_trace.jsonl @@ -0,0 +1,5 @@ +{"timestamp":"2026-03-20T10:00:01Z","event":"tool_complete","tool":"Write","session":"sess-001","input":"{\"file_path\":\"tests/test_fib.py\",\"content\":\"def test_fib(): assert fib(0) == 0\"}","output":"File created"} +{"timestamp":"2026-03-20T10:00:10Z","event":"tool_complete","tool":"Bash","session":"sess-001","input":"{\"command\":\"cd /tmp/sandbox && pytest tests/\"}","output":"FAILED - 1 failed"} +{"timestamp":"2026-03-20T10:00:20Z","event":"tool_complete","tool":"Write","session":"sess-001","input":"{\"file_path\":\"src/fib.py\",\"content\":\"def fib(n): return n if n <= 1 else fib(n-1)+fib(n-2)\"}","output":"File created"} +{"timestamp":"2026-03-20T10:00:30Z","event":"tool_complete","tool":"Bash","session":"sess-001","input":"{\"command\":\"cd /tmp/sandbox && pytest tests/\"}","output":"1 passed"} +{"timestamp":"2026-03-20T10:00:40Z","event":"tool_complete","tool":"Edit","session":"sess-001","input":"{\"file_path\":\"src/fib.py\",\"old_string\":\"return n if\",\"new_string\":\"if n < 0: raise ValueError\\n return n if\"}","output":"File edited"} diff --git a/skills/skill-comply/fixtures/noncompliant_trace.jsonl b/skills/skill-comply/fixtures/noncompliant_trace.jsonl new file mode 100644 index 00000000..a0c69269 --- /dev/null +++ b/skills/skill-comply/fixtures/noncompliant_trace.jsonl @@ -0,0 +1,3 @@ +{"timestamp":"2026-03-20T10:00:01Z","event":"tool_complete","tool":"Write","session":"sess-002","input":"{\"file_path\":\"src/fib.py\",\"content\":\"def fib(n): return n if n <= 1 else fib(n-1)+fib(n-2)\"}","output":"File created"} +{"timestamp":"2026-03-20T10:00:10Z","event":"tool_complete","tool":"Write","session":"sess-002","input":"{\"file_path\":\"tests/test_fib.py\",\"content\":\"def test_fib(): assert fib(0) == 0\"}","output":"File created"} +{"timestamp":"2026-03-20T10:00:20Z","event":"tool_complete","tool":"Bash","session":"sess-002","input":"{\"command\":\"cd /tmp/sandbox && pytest tests/\"}","output":"1 passed"} diff --git a/skills/skill-comply/fixtures/tdd_spec.yaml b/skills/skill-comply/fixtures/tdd_spec.yaml new file mode 100644 index 00000000..c1274979 --- /dev/null +++ b/skills/skill-comply/fixtures/tdd_spec.yaml @@ -0,0 +1,44 @@ +id: tdd-workflow +name: TDD Workflow Compliance +source_rule: rules/common/testing.md +version: "2.0" + +steps: + - id: write_test + description: "Write test file BEFORE implementation" + required: true + detector: + description: "A Write or Edit to a test file (filename contains 'test')" + before_step: write_impl + + - id: run_test_red + description: "Run test and confirm FAIL (RED phase)" + required: true + detector: + description: "Run pytest or test command that produces a FAIL/ERROR result" + after_step: write_test + before_step: write_impl + + - id: write_impl + description: "Write minimal implementation (GREEN phase)" + required: true + detector: + description: "Write or Edit an implementation file (not a test file)" + after_step: run_test_red + + - id: run_test_green + description: "Run test and confirm PASS (GREEN phase)" + required: true + detector: + description: "Run pytest or test command that produces a PASS result" + after_step: write_impl + + - id: refactor + description: "Refactor (IMPROVE phase)" + required: false + detector: + description: "Edit a source file for refactoring after tests pass" + after_step: run_test_green + +scoring: + threshold_promote_to_hook: 0.6 diff --git a/skills/skill-comply/prompts/classifier.md b/skills/skill-comply/prompts/classifier.md new file mode 100644 index 00000000..7a706c91 --- /dev/null +++ b/skills/skill-comply/prompts/classifier.md @@ -0,0 +1,24 @@ +You are classifying tool calls from a coding agent session against expected behavioral steps. + +For each tool call, determine which step (if any) it belongs to. A tool call can match at most one step. + +Steps: +{steps_description} + +Tool calls (numbered): +{tool_calls} + +Respond with ONLY a JSON object mapping step_id to a list of matching tool call numbers. +Include only steps that have at least one match. If no tool calls match a step, omit it. + +Example response: +{"write_test": [0, 1], "run_test_red": [2], "write_impl": [3, 4]} + +Rules: +- Match based on the MEANING of the tool call, not just keywords +- A Write to "test_calculator.py" is a test file write, even if the content is implementation-like +- A Write to "calculator.py" is an implementation write, even if it contains test helpers +- A Bash running "pytest" that outputs "FAILED" is a RED phase test run +- A Bash running "pytest" that outputs "passed" is a GREEN phase test run +- Each tool call should match at most one step (pick the best match) +- If a tool call doesn't match any step, don't include it diff --git a/skills/skill-comply/prompts/scenario_generator.md b/skills/skill-comply/prompts/scenario_generator.md new file mode 100644 index 00000000..2cd14d9a --- /dev/null +++ b/skills/skill-comply/prompts/scenario_generator.md @@ -0,0 +1,62 @@ + +You are generating test scenarios for a coding agent skill compliance tool. +Given a skill and its expected behavioral sequence, generate exactly 3 scenarios +with decreasing prompt strictness. + +Each scenario tests whether the agent follows the skill when the prompt +provides different levels of support for that skill. + +Output ONLY valid YAML (no markdown fences, no commentary): + +scenarios: + - id: + level: 1 + level_name: supportive + description: + prompt: | + + setup_commands: + - "mkdir -p /tmp/skill-comply-sandbox/{id}/src /tmp/skill-comply-sandbox/{id}/tests" + - + + - id: + level: 2 + level_name: neutral + description: + prompt: | + + setup_commands: + - + + - id: + level: 3 + level_name: competing + description: + prompt: | + + setup_commands: + - + +Rules: +- Level 1 (supportive): Prompt explicitly instructs the agent to follow the skill + e.g. "Use TDD to implement..." +- Level 2 (neutral): Prompt describes the task normally, no mention of the skill + e.g. "Implement a function that..." +- Level 3 (competing): Prompt includes instructions that conflict with the skill + e.g. "Quickly implement... tests are optional..." +- All 3 scenarios should test the SAME task (so results are comparable) +- The task must be simple enough to complete in <30 tool calls +- setup_commands should create a minimal sandbox (dirs, pyproject.toml, etc.) +- Prompts should be realistic — something a developer would actually ask + +Skill content: + +--- +{skill_content} +--- + +Expected behavioral sequence: + +--- +{spec_yaml} +--- diff --git a/skills/skill-comply/prompts/spec_generator.md b/skills/skill-comply/prompts/spec_generator.md new file mode 100644 index 00000000..d9fabb7c --- /dev/null +++ b/skills/skill-comply/prompts/spec_generator.md @@ -0,0 +1,42 @@ + +You are analyzing a skill/rule file for a coding agent (Claude Code). +Your task: extract the **observable behavioral sequence** that an agent should follow when this skill is active. + +Each step should be described in natural language. Do NOT use regex patterns. + +Output ONLY valid YAML in this exact format (no markdown fences, no commentary): + +id: +name: +source_rule: +version: "1.0" + +steps: + - id: + description: + required: true|false + detector: + description: + after_step: + before_step: + +scoring: + threshold_promote_to_hook: 0.6 + +Rules: +- detector.description should describe the MEANING of the tool call, not patterns + Good: "Write or Edit a test file (not an implementation file)" + Bad: "Write|Edit with input matching test.*\\.py" +- Use before_step/after_step for skills where ORDER matters (e.g. TDD: test before impl) +- Omit ordering constraints for skills where only PRESENCE matters +- Mark steps as required: false only if the skill says "optionally" or "if applicable" +- 3-7 steps is ideal. Don't over-decompose +- IMPORTANT: Quote all YAML string values containing colons with double quotes + Good: description: "Use conventional commit format (type: description)" + Bad: description: Use conventional commit format (type: description) + +Skill file to analyze: + +--- +{skill_content} +--- diff --git a/skills/skill-comply/pyproject.toml b/skills/skill-comply/pyproject.toml new file mode 100644 index 00000000..323185ce --- /dev/null +++ b/skills/skill-comply/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "skill-comply" +version = "0.1.0" +description = "Automated skill compliance measurement for Claude Code" +requires-python = ">=3.11" +dependencies = ["pyyaml>=6.0"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +pythonpath = ["."] + +[dependency-groups] +dev = [ + "pytest>=9.0.2", +] diff --git a/skills/skill-comply/scripts/__init__.py b/skills/skill-comply/scripts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/skills/skill-comply/scripts/classifier.py b/skills/skill-comply/scripts/classifier.py new file mode 100644 index 00000000..2e4207c8 --- /dev/null +++ b/skills/skill-comply/scripts/classifier.py @@ -0,0 +1,85 @@ +"""Classify tool calls against compliance steps using LLM.""" + +from __future__ import annotations + +import json +import logging +import subprocess +from pathlib import Path + +logger = logging.getLogger(__name__) + +from scripts.parser import ComplianceSpec, ObservationEvent + +PROMPTS_DIR = Path(__file__).parent.parent / "prompts" + + +def classify_events( + spec: ComplianceSpec, + trace: list[ObservationEvent], + model: str = "haiku", +) -> dict[str, list[int]]: + """Classify which tool calls match which compliance steps. + + Returns {step_id: [event_indices]} via a single LLM call. + """ + if not trace: + return {} + + steps_desc = "\n".join( + f"- {step.id}: {step.detector.description}" + for step in spec.steps + ) + + tool_calls = "\n".join( + f"[{i}] {event.tool}: input={event.input[:500]} output={event.output[:200]}" + for i, event in enumerate(trace) + ) + + prompt_template = (PROMPTS_DIR / "classifier.md").read_text() + prompt = ( + prompt_template + .replace("{steps_description}", steps_desc) + .replace("{tool_calls}", tool_calls) + ) + + result = subprocess.run( + ["claude", "-p", prompt, "--model", model, "--output-format", "text"], + capture_output=True, + text=True, + timeout=60, + ) + + if result.returncode != 0: + raise RuntimeError( + f"classifier subprocess failed (rc={result.returncode}): " + f"{result.stderr[:500]}" + ) + + return _parse_classification(result.stdout) + + +def _parse_classification(text: str) -> dict[str, list[int]]: + """Parse LLM classification output into {step_id: [event_indices]}.""" + text = text.strip() + # Strip markdown fences + lines = text.splitlines() + if lines and lines[0].startswith("```"): + lines = lines[1:] + if lines and lines[-1].startswith("```"): + lines = lines[:-1] + cleaned = "\n".join(lines) + + try: + parsed = json.loads(cleaned) + if not isinstance(parsed, dict): + logger.warning("Classifier returned non-dict JSON: %s", type(parsed).__name__) + return {} + return { + k: [int(i) for i in v] + for k, v in parsed.items() + if isinstance(v, list) + } + except (json.JSONDecodeError, ValueError, TypeError) as e: + logger.warning("Failed to parse classification output: %s", e) + return {} diff --git a/skills/skill-comply/scripts/grader.py b/skills/skill-comply/scripts/grader.py new file mode 100644 index 00000000..b4250c6b --- /dev/null +++ b/skills/skill-comply/scripts/grader.py @@ -0,0 +1,122 @@ +"""Grade observation traces against compliance specs using LLM classification.""" + +from __future__ import annotations + +from dataclasses import dataclass + +from scripts.classifier import classify_events +from scripts.parser import ComplianceSpec, ObservationEvent, Step + + +@dataclass(frozen=True) +class StepResult: + step_id: str + detected: bool + evidence: tuple[ObservationEvent, ...] + failure_reason: str | None + + +@dataclass(frozen=True) +class ComplianceResult: + spec_id: str + steps: tuple[StepResult, ...] + compliance_rate: float + recommend_hook_promotion: bool + classification: dict[str, list[int]] + + +def _check_temporal_order( + step: Step, + event: ObservationEvent, + resolved: dict[str, list[ObservationEvent]], + classified: dict[str, list[ObservationEvent]], +) -> str | None: + """Check before_step/after_step constraints. Returns failure reason or None.""" + if step.detector.after_step is not None: + after_events = resolved.get(step.detector.after_step, []) + if not after_events: + return f"after_step '{step.detector.after_step}' not yet detected" + latest_after = max(e.timestamp for e in after_events) + if event.timestamp <= latest_after: + return ( + f"must occur after '{step.detector.after_step}' " + f"(last at {latest_after}), but found at {event.timestamp}" + ) + + if step.detector.before_step is not None: + # Look ahead using LLM classification results + before_events = resolved.get(step.detector.before_step) + if before_events is None: + before_events = classified.get(step.detector.before_step, []) + if before_events: + earliest_before = min(e.timestamp for e in before_events) + if event.timestamp >= earliest_before: + return ( + f"must occur before '{step.detector.before_step}' " + f"(first at {earliest_before}), but found at {event.timestamp}" + ) + + return None + + +def grade( + spec: ComplianceSpec, + trace: list[ObservationEvent], + classifier_model: str = "haiku", +) -> ComplianceResult: + """Grade a trace against a compliance spec using LLM classification.""" + sorted_trace = sorted(trace, key=lambda e: e.timestamp) + + # Step 1: LLM classifies all events in one batch call + classification = classify_events(spec, sorted_trace, model=classifier_model) + + # Convert indices to events + classified: dict[str, list[ObservationEvent]] = { + step_id: [sorted_trace[i] for i in indices if 0 <= i < len(sorted_trace)] + for step_id, indices in classification.items() + } + + # Step 2: Check temporal ordering (deterministic) + resolved: dict[str, list[ObservationEvent]] = {} + step_results: list[StepResult] = [] + + for step in spec.steps: + candidates = classified.get(step.id, []) + matched: list[ObservationEvent] = [] + failure_reason: str | None = None + + for event in candidates: + temporal_fail = _check_temporal_order(step, event, resolved, classified) + if temporal_fail is None: + matched.append(event) + break + else: + failure_reason = temporal_fail + + detected = len(matched) > 0 + if detected: + resolved[step.id] = matched + elif failure_reason is None: + failure_reason = f"no matching event classified for step '{step.id}'" + + step_results.append(StepResult( + step_id=step.id, + detected=detected, + evidence=tuple(matched), + failure_reason=failure_reason if not detected else None, + )) + + required_ids = {s.id for s in spec.steps if s.required} + required_steps = [s for s in step_results if s.step_id in required_ids] + detected_required = sum(1 for s in required_steps if s.detected) + total_required = len(required_steps) + + compliance_rate = detected_required / total_required if total_required > 0 else 0.0 + + return ComplianceResult( + spec_id=spec.id, + steps=tuple(step_results), + compliance_rate=compliance_rate, + recommend_hook_promotion=compliance_rate < spec.threshold_promote_to_hook, + classification=classification, + ) diff --git a/skills/skill-comply/scripts/parser.py b/skills/skill-comply/scripts/parser.py new file mode 100644 index 00000000..0b8169d4 --- /dev/null +++ b/skills/skill-comply/scripts/parser.py @@ -0,0 +1,107 @@ +"""Parse observation traces (JSONL) and compliance specs (YAML).""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path + +import yaml + + +@dataclass(frozen=True) +class ObservationEvent: + timestamp: str + event: str + tool: str + session: str + input: str + output: str + + +@dataclass(frozen=True) +class Detector: + description: str + after_step: str | None = None + before_step: str | None = None + + +@dataclass(frozen=True) +class Step: + id: str + description: str + required: bool + detector: Detector + + +@dataclass(frozen=True) +class ComplianceSpec: + id: str + name: str + source_rule: str + version: str + steps: tuple[Step, ...] + threshold_promote_to_hook: float + + +def parse_trace(path: Path) -> list[ObservationEvent]: + """Parse a JSONL observation trace file into sorted events.""" + if not path.is_file(): + raise FileNotFoundError(f"Trace file not found: {path}") + + text = path.read_text().strip() + if not text: + return [] + + events: list[ObservationEvent] = [] + for i, line in enumerate(text.splitlines(), 1): + try: + raw = json.loads(line) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON at line {i}: {e}") from e + try: + events.append(ObservationEvent( + timestamp=raw["timestamp"], + event=raw["event"], + tool=raw["tool"], + session=raw["session"], + input=raw.get("input", ""), + output=raw.get("output", ""), + )) + except KeyError as e: + raise ValueError(f"Missing required field {e} at line {i}") from e + + return sorted(events, key=lambda e: e.timestamp) + + +def parse_spec(path: Path) -> ComplianceSpec: + """Parse a YAML compliance spec file.""" + if not path.is_file(): + raise FileNotFoundError(f"Spec file not found: {path}") + raw = yaml.safe_load(path.read_text()) + + steps: list[Step] = [] + for s in raw["steps"]: + d = s["detector"] + steps.append(Step( + id=s["id"], + description=s["description"], + required=s["required"], + detector=Detector( + description=d["description"], + after_step=d.get("after_step"), + before_step=d.get("before_step"), + ), + )) + + if "scoring" not in raw: + raise KeyError("Missing 'scoring' section in compliance spec") + + return ComplianceSpec( + id=raw["id"], + name=raw["name"], + source_rule=raw["source_rule"], + version=raw["version"], + steps=tuple(steps), + threshold_promote_to_hook=raw["scoring"]["threshold_promote_to_hook"], + ) diff --git a/skills/skill-comply/scripts/report.py b/skills/skill-comply/scripts/report.py new file mode 100644 index 00000000..88ff4ea7 --- /dev/null +++ b/skills/skill-comply/scripts/report.py @@ -0,0 +1,170 @@ +"""Generate Markdown compliance reports.""" + +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path + +from scripts.grader import ComplianceResult +from scripts.parser import ComplianceSpec, ObservationEvent +from scripts.scenario_generator import Scenario + + +def generate_report( + skill_path: Path, + spec: ComplianceSpec, + results: list[tuple[str, ComplianceResult, list[ObservationEvent]]], + scenarios: list[Scenario] | None = None, +) -> str: + """Generate a Markdown compliance report. + + Args: + skill_path: Path to the skill file that was tested. + spec: The compliance spec used for grading. + results: List of (scenario_level_name, ComplianceResult, observations) tuples. + scenarios: Original scenario definitions with prompts. + """ + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + overall = _overall_compliance(results) + threshold = spec.threshold_promote_to_hook + + lines: list[str] = [] + lines.append(f"# skill-comply Report: {skill_path.name}") + lines.append(f"Generated: {now}") + lines.append("") + + # Summary + lines.append("## Summary") + lines.append("") + lines.append(f"| Metric | Value |") + lines.append(f"|--------|-------|") + lines.append(f"| Skill | `{skill_path}` |") + lines.append(f"| Spec | {spec.id} |") + lines.append(f"| Scenarios | {len(results)} |") + lines.append(f"| Overall Compliance | {overall:.0%} |") + lines.append(f"| Threshold | {threshold:.0%} |") + + promote_steps = _steps_to_promote(spec, results, threshold) + if promote_steps: + step_names = ", ".join(promote_steps) + lines.append(f"| Recommendation | **Promote {step_names} to hooks** |") + else: + lines.append(f"| Recommendation | All steps above threshold — no hook promotion needed |") + lines.append("") + + # Expected Behavioral Sequence + lines.append("## Expected Behavioral Sequence") + lines.append("") + lines.append("| # | Step | Required | Description |") + lines.append("|---|------|----------|-------------|") + for i, step in enumerate(spec.steps, 1): + req = "Yes" if step.required else "No" + lines.append(f"| {i} | {step.id} | {req} | {step.detector.description} |") + lines.append("") + + # Scenario Results + lines.append("## Scenario Results") + lines.append("") + lines.append("| Scenario | Compliance | Failed Steps |") + lines.append("|----------|-----------|----------------|") + for level_name, result, _obs in results: + failed = [s.step_id for s in result.steps if not s.detected + and any(sp.id == s.step_id and sp.required for sp in spec.steps)] + failed_str = ", ".join(failed) if failed else "—" + lines.append(f"| {level_name} | {result.compliance_rate:.0%} | {failed_str} |") + lines.append("") + + # Scenario Prompts + if scenarios: + lines.append("## Scenario Prompts") + lines.append("") + for s in scenarios: + lines.append(f"### {s.level_name} (Level {s.level})") + lines.append("") + for prompt_line in s.prompt.splitlines(): + lines.append(f"> {prompt_line}") + lines.append("") + + # Hook Promotion Recommendations (optional/advanced) + if promote_steps: + lines.append("## Advanced: Hook Promotion Recommendations (optional)") + lines.append("") + for step_id in promote_steps: + rate = _step_compliance_rate(step_id, results) + step = next(s for s in spec.steps if s.id == step_id) + lines.append( + f"- **{step_id}** (compliance {rate:.0%}): {step.description}" + ) + lines.append("") + + # Per-scenario details with timeline + lines.append("## Detail") + lines.append("") + for level_name, result, observations in results: + lines.append(f"### {level_name} (Compliance: {result.compliance_rate:.0%})") + lines.append("") + lines.append("| Step | Required | Detected | Reason |") + lines.append("|------|----------|----------|--------|") + for sr in result.steps: + req = "Yes" if any( + sp.id == sr.step_id and sp.required for sp in spec.steps + ) else "No" + det = "YES" if sr.detected else "NO" + reason = sr.failure_reason or "—" + lines.append(f"| {sr.step_id} | {req} | {det} | {reason} |") + lines.append("") + + # Timeline: show what the agent actually did + if observations: + # Build reverse index: event_index → step_id + index_to_step: dict[int, str] = {} + for step_id, indices in result.classification.items(): + for idx in indices: + index_to_step[idx] = step_id + + lines.append(f"**Tool Call Timeline ({len(observations)} calls)**") + lines.append("") + lines.append("| # | Tool | Input | Output | Classified As |") + lines.append("|---|------|-------|--------|------|") + for i, obs in enumerate(observations): + step_label = index_to_step.get(i, "—") + input_summary = obs.input[:100].replace("|", "\\|").replace("\n", " ") + output_summary = obs.output[:50].replace("|", "\\|").replace("\n", " ") + lines.append( + f"| {i} | {obs.tool} | {input_summary} | {output_summary} | {step_label} |" + ) + lines.append("") + + return "\n".join(lines) + + +def _overall_compliance(results: list[tuple[str, ComplianceResult, list[ObservationEvent]]]) -> float: + if not results: + return 0.0 + return sum(r.compliance_rate for _, r, _obs in results) / len(results) + + +def _step_compliance_rate( + step_id: str, + results: list[tuple[str, ComplianceResult, list[ObservationEvent]]], +) -> float: + detected = sum( + 1 for _, r, _obs in results + for s in r.steps if s.step_id == step_id and s.detected + ) + return detected / len(results) if results else 0.0 + + +def _steps_to_promote( + spec: ComplianceSpec, + results: list[tuple[str, ComplianceResult, list[ObservationEvent]]], + threshold: float, +) -> list[str]: + promote = [] + for step in spec.steps: + if not step.required: + continue + rate = _step_compliance_rate(step.id, results) + if rate < threshold: + promote.append(step.id) + return promote diff --git a/skills/skill-comply/scripts/run.py b/skills/skill-comply/scripts/run.py new file mode 100644 index 00000000..3e4695e5 --- /dev/null +++ b/skills/skill-comply/scripts/run.py @@ -0,0 +1,127 @@ +"""CLI entry point for skill-comply.""" + +from __future__ import annotations + +import argparse +import logging +import sys +from pathlib import Path +from typing import Any + +import yaml + +from scripts.grader import grade +from scripts.report import generate_report +from scripts.runner import run_scenario +from scripts.scenario_generator import generate_scenarios +from scripts.spec_generator import generate_spec + +logger = logging.getLogger(__name__) + + +def main() -> None: + logging.basicConfig(level=logging.INFO, format="%(message)s") + + parser = argparse.ArgumentParser( + description="skill-comply: Measure skill compliance rates", + ) + parser.add_argument( + "skill", + type=Path, + help="Path to skill/rule file to test", + ) + parser.add_argument( + "--model", + default="sonnet", + help="Model for scenario execution (default: sonnet)", + ) + parser.add_argument( + "--gen-model", + default="haiku", + help="Model for spec/scenario generation (default: haiku)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Generate spec and scenarios without executing", + ) + parser.add_argument( + "--output", + type=Path, + default=None, + help="Output report path (default: results/.md)", + ) + + args = parser.parse_args() + + if not args.skill.is_file(): + logger.error("Error: Skill file not found: %s", args.skill) + sys.exit(1) + + results_dir = Path(__file__).parent.parent / "results" + results_dir.mkdir(exist_ok=True) + + # Step 1: Generate compliance spec + logger.info("[1/4] Generating compliance spec from %s...", args.skill.name) + spec = generate_spec(args.skill, model=args.gen_model) + logger.info(" %d steps extracted", len(spec.steps)) + + # Step 2: Generate scenarios + spec_yaml = yaml.dump({ + "steps": [ + {"id": s.id, "description": s.description, "required": s.required} + for s in spec.steps + ] + }) + logger.info("[2/4] Generating scenarios (3 prompt strictness levels)...") + scenarios = generate_scenarios(args.skill, spec_yaml, model=args.gen_model) + logger.info(" %d scenarios generated", len(scenarios)) + + for s in scenarios: + logger.info(" - %s: %s", s.level_name, s.description[:60]) + + if args.dry_run: + logger.info("\n[dry-run] Spec and scenarios generated. Skipping execution.") + logger.info("\nSpec: %s (%d steps)", spec.id, len(spec.steps)) + for step in spec.steps: + marker = "*" if step.required else " " + logger.info(" [%s] %s: %s", marker, step.id, step.description) + return + + # Step 3: Execute scenarios + logger.info("[3/4] Executing scenarios (model=%s)...", args.model) + graded_results: list[tuple[str, Any, list[Any]]] = [] + + for scenario in scenarios: + logger.info(" Running %s...", scenario.level_name) + run = run_scenario(scenario, model=args.model) + result = grade(spec, list(run.observations)) + graded_results.append((scenario.level_name, result, list(run.observations))) + logger.info(" %s: %.0f%%", scenario.level_name, result.compliance_rate * 100) + + # Step 4: Generate report + skill_name = args.skill.parent.name if args.skill.stem == "SKILL" else args.skill.stem + output_path = args.output or results_dir / f"{skill_name}.md" + logger.info("[4/4] Generating report...") + + report = generate_report(args.skill, spec, graded_results, scenarios=scenarios) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(report) + logger.info(" Report saved to %s", output_path) + + # Summary + if not graded_results: + logger.warning("No scenarios were executed.") + return + overall = sum(r.compliance_rate for _, r, _obs in graded_results) / len(graded_results) + logger.info("\n%s", "=" * 50) + logger.info("Overall Compliance: %.0f%%", overall * 100) + if overall < spec.threshold_promote_to_hook: + logger.info( + "Recommendation: Some steps have low compliance. " + "Consider promoting them to hooks. See the report for details." + ) + + +if __name__ == "__main__": + main() diff --git a/skills/skill-comply/scripts/runner.py b/skills/skill-comply/scripts/runner.py new file mode 100644 index 00000000..f6c609eb --- /dev/null +++ b/skills/skill-comply/scripts/runner.py @@ -0,0 +1,161 @@ +"""Run scenarios via claude -p and parse tool calls from stream-json output.""" + +from __future__ import annotations + +import json +import re +import shlex +import shutil +import subprocess +from dataclasses import dataclass +from pathlib import Path + +from scripts.parser import ObservationEvent +from scripts.scenario_generator import Scenario + +SANDBOX_BASE = Path("/tmp/skill-comply-sandbox") +ALLOWED_MODELS = frozenset({"haiku", "sonnet", "opus"}) + + +@dataclass(frozen=True) +class ScenarioRun: + scenario: Scenario + observations: tuple[ObservationEvent, ...] + sandbox_dir: Path + + +def run_scenario( + scenario: Scenario, + model: str = "sonnet", + max_turns: int = 30, + timeout: int = 300, +) -> ScenarioRun: + """Execute a scenario and extract tool calls from stream-json output.""" + if model not in ALLOWED_MODELS: + raise ValueError(f"Unknown model: {model!r}. Allowed: {ALLOWED_MODELS}") + + sandbox_dir = _safe_sandbox_dir(scenario.id) + _setup_sandbox(sandbox_dir, scenario) + + result = subprocess.run( + [ + "claude", "-p", scenario.prompt, + "--model", model, + "--max-turns", str(max_turns), + "--add-dir", str(sandbox_dir), + "--allowedTools", "Read,Write,Edit,Bash,Glob,Grep", + "--output-format", "stream-json", + "--verbose", + ], + capture_output=True, + text=True, + timeout=timeout, + cwd=sandbox_dir, + ) + + if result.returncode != 0: + raise RuntimeError( + f"claude -p failed (rc={result.returncode}): {result.stderr[:500]}" + ) + + observations = _parse_stream_json(result.stdout) + + return ScenarioRun( + scenario=scenario, + observations=tuple(observations), + sandbox_dir=sandbox_dir, + ) + + +def _safe_sandbox_dir(scenario_id: str) -> Path: + """Sanitize scenario ID and ensure path stays within sandbox base.""" + safe_id = re.sub(r"[^a-zA-Z0-9\-_]", "_", scenario_id) + path = SANDBOX_BASE / safe_id + # Validate path stays within sandbox base (raises ValueError on traversal) + path.resolve().relative_to(SANDBOX_BASE.resolve()) + return path + + +def _setup_sandbox(sandbox_dir: Path, scenario: Scenario) -> None: + """Create sandbox directory and run setup commands.""" + if sandbox_dir.exists(): + shutil.rmtree(sandbox_dir) + sandbox_dir.mkdir(parents=True) + + subprocess.run(["git", "init"], cwd=sandbox_dir, capture_output=True) + + for cmd in scenario.setup_commands: + parts = shlex.split(cmd) + subprocess.run(parts, cwd=sandbox_dir, capture_output=True) + + +def _parse_stream_json(stdout: str) -> list[ObservationEvent]: + """Parse claude -p stream-json output into ObservationEvents. + + Stream-json format: + - type=assistant with content[].type=tool_use → tool call (name, input) + - type=user with content[].type=tool_result → tool result (output) + """ + events: list[ObservationEvent] = [] + pending: dict[str, dict] = {} + event_counter = 0 + + for line in stdout.strip().splitlines(): + try: + msg = json.loads(line) + except json.JSONDecodeError: + continue + + msg_type = msg.get("type") + + if msg_type == "assistant": + content = msg.get("message", {}).get("content", []) + for block in content: + if block.get("type") == "tool_use": + tool_use_id = block.get("id", "") + tool_input = block.get("input", {}) + input_str = ( + json.dumps(tool_input)[:5000] + if isinstance(tool_input, dict) + else str(tool_input)[:5000] + ) + pending[tool_use_id] = { + "tool": block.get("name", "unknown"), + "input": input_str, + "order": event_counter, + } + event_counter += 1 + + elif msg_type == "user": + content = msg.get("message", {}).get("content", []) + if isinstance(content, list): + for block in content: + tool_use_id = block.get("tool_use_id", "") + if tool_use_id in pending: + info = pending.pop(tool_use_id) + output_content = block.get("content", "") + if isinstance(output_content, list): + output_str = json.dumps(output_content)[:5000] + else: + output_str = str(output_content)[:5000] + + events.append(ObservationEvent( + timestamp=f"T{info['order']:04d}", + event="tool_complete", + tool=info["tool"], + session=msg.get("session_id", "unknown"), + input=info["input"], + output=output_str, + )) + + for _tool_use_id, info in pending.items(): + events.append(ObservationEvent( + timestamp=f"T{info['order']:04d}", + event="tool_complete", + tool=info["tool"], + session="unknown", + input=info["input"], + output="", + )) + + return sorted(events, key=lambda e: e.timestamp) diff --git a/skills/skill-comply/scripts/scenario_generator.py b/skills/skill-comply/scripts/scenario_generator.py new file mode 100644 index 00000000..db8db26a --- /dev/null +++ b/skills/skill-comply/scripts/scenario_generator.py @@ -0,0 +1,70 @@ +"""Generate pressure scenarios from skill + spec using LLM.""" + +from __future__ import annotations + +import subprocess +from dataclasses import dataclass +from pathlib import Path + +import yaml + +from scripts.utils import extract_yaml + +PROMPTS_DIR = Path(__file__).parent.parent / "prompts" + + +@dataclass(frozen=True) +class Scenario: + id: str + level: int + level_name: str + description: str + prompt: str + setup_commands: tuple[str, ...] + + +def generate_scenarios( + skill_path: Path, + spec_yaml: str, + model: str = "haiku", +) -> list[Scenario]: + """Generate 3 scenarios with decreasing prompt strictness. + + Calls claude -p with the scenario_generator prompt, parses YAML output. + """ + skill_content = skill_path.read_text() + prompt_template = (PROMPTS_DIR / "scenario_generator.md").read_text() + prompt = ( + prompt_template + .replace("{skill_content}", skill_content) + .replace("{spec_yaml}", spec_yaml) + ) + + result = subprocess.run( + ["claude", "-p", prompt, "--model", model, "--output-format", "text"], + capture_output=True, + text=True, + timeout=120, + ) + + if result.returncode != 0: + raise RuntimeError(f"claude -p failed: {result.stderr}") + + if not result.stdout.strip(): + raise RuntimeError("claude -p returned empty output") + + raw_yaml = extract_yaml(result.stdout) + parsed = yaml.safe_load(raw_yaml) + + scenarios: list[Scenario] = [] + for s in parsed["scenarios"]: + scenarios.append(Scenario( + id=s["id"], + level=s["level"], + level_name=s["level_name"], + description=s["description"], + prompt=s["prompt"].strip(), + setup_commands=tuple(s.get("setup_commands", [])), + )) + + return sorted(scenarios, key=lambda s: s.level) diff --git a/skills/skill-comply/scripts/spec_generator.py b/skills/skill-comply/scripts/spec_generator.py new file mode 100644 index 00000000..3b366173 --- /dev/null +++ b/skills/skill-comply/scripts/spec_generator.py @@ -0,0 +1,72 @@ +"""Generate compliance specs from skill files using LLM.""" + +from __future__ import annotations + +import subprocess +import tempfile +from pathlib import Path + +import yaml + +from scripts.parser import ComplianceSpec, parse_spec +from scripts.utils import extract_yaml + +PROMPTS_DIR = Path(__file__).parent.parent / "prompts" + + +def generate_spec( + skill_path: Path, + model: str = "haiku", + max_retries: int = 2, +) -> ComplianceSpec: + """Generate a compliance spec from a skill/rule file. + + Calls claude -p with the spec_generator prompt, parses YAML output. + Retries on YAML parse errors with error feedback. + """ + skill_content = skill_path.read_text() + prompt_template = (PROMPTS_DIR / "spec_generator.md").read_text() + base_prompt = prompt_template.replace("{skill_content}", skill_content) + + last_error: Exception | None = None + + for attempt in range(max_retries + 1): + prompt = base_prompt + if attempt > 0 and last_error is not None: + prompt += ( + f"\n\nPREVIOUS ATTEMPT FAILED with YAML parse error:\n" + f"{last_error}\n\n" + f"Please fix the YAML. Remember to quote all string values " + f"that contain colons, e.g.: description: \"Use type: description format\"" + ) + + result = subprocess.run( + ["claude", "-p", prompt, "--model", model, "--output-format", "text"], + capture_output=True, + text=True, + timeout=120, + ) + + if result.returncode != 0: + raise RuntimeError(f"claude -p failed: {result.stderr}") + + raw_yaml = extract_yaml(result.stdout) + + tmp_path = None + with tempfile.NamedTemporaryFile( + mode="w", suffix=".yaml", delete=False, + ) as f: + f.write(raw_yaml) + tmp_path = Path(f.name) + + try: + return parse_spec(tmp_path) + except (yaml.YAMLError, KeyError, TypeError) as e: + last_error = e + if attempt == max_retries: + raise + finally: + if tmp_path is not None: + tmp_path.unlink(missing_ok=True) + + raise RuntimeError("unreachable") diff --git a/skills/skill-comply/scripts/utils.py b/skills/skill-comply/scripts/utils.py new file mode 100644 index 00000000..77b9a5c9 --- /dev/null +++ b/skills/skill-comply/scripts/utils.py @@ -0,0 +1,13 @@ +"""Shared utilities for skill-comply scripts.""" + +from __future__ import annotations + + +def extract_yaml(text: str) -> str: + """Extract YAML from LLM output, stripping markdown fences if present.""" + lines = text.strip().splitlines() + if lines and lines[0].startswith("```"): + lines = lines[1:] + if lines and lines[-1].startswith("```"): + lines = lines[:-1] + return "\n".join(lines) diff --git a/skills/skill-comply/tests/test_grader.py b/skills/skill-comply/tests/test_grader.py new file mode 100644 index 00000000..543139c8 --- /dev/null +++ b/skills/skill-comply/tests/test_grader.py @@ -0,0 +1,137 @@ +"""Tests for grader module — compliance scoring with LLM classification.""" + +from pathlib import Path +from unittest.mock import patch + +import pytest + +from scripts.grader import ComplianceResult, StepResult, grade +from scripts.parser import parse_spec, parse_trace + +FIXTURES = Path(__file__).parent.parent / "fixtures" + + +@pytest.fixture +def tdd_spec(): + return parse_spec(FIXTURES / "tdd_spec.yaml") + + +@pytest.fixture +def compliant_trace(): + return parse_trace(FIXTURES / "compliant_trace.jsonl") + + +@pytest.fixture +def noncompliant_trace(): + return parse_trace(FIXTURES / "noncompliant_trace.jsonl") + + +def _mock_compliant_classification(spec, trace, model="haiku"): # noqa: ARG001 + """Simulate LLM correctly classifying a compliant trace.""" + return { + "write_test": [0], + "run_test_red": [1], + "write_impl": [2], + "run_test_green": [3], + "refactor": [4], + } + + +def _mock_noncompliant_classification(spec, trace, model="haiku"): + """Simulate LLM classifying a noncompliant trace (impl before test).""" + return { + "write_impl": [0], # src/fib.py written first + "write_test": [1], # test written second + "run_test_green": [2], # only a passing test run + } + + +def _mock_empty_classification(spec, trace, model="haiku"): + return {} + + +class TestGradeCompliant: + @patch("scripts.grader.classify_events", side_effect=_mock_compliant_classification) + def test_returns_compliance_result(self, mock_cls, tdd_spec, compliant_trace) -> None: + result = grade(tdd_spec, compliant_trace) + assert isinstance(result, ComplianceResult) + + @patch("scripts.grader.classify_events", side_effect=_mock_compliant_classification) + def test_full_compliance(self, mock_cls, tdd_spec, compliant_trace) -> None: + result = grade(tdd_spec, compliant_trace) + assert result.compliance_rate == 1.0 + + @patch("scripts.grader.classify_events", side_effect=_mock_compliant_classification) + def test_all_required_steps_detected(self, mock_cls, tdd_spec, compliant_trace) -> None: + result = grade(tdd_spec, compliant_trace) + required_results = [s for s in result.steps if s.step_id in + ("write_test", "run_test_red", "write_impl", "run_test_green")] + assert all(s.detected for s in required_results) + + @patch("scripts.grader.classify_events", side_effect=_mock_compliant_classification) + def test_optional_step_detected(self, mock_cls, tdd_spec, compliant_trace) -> None: + result = grade(tdd_spec, compliant_trace) + refactor = next(s for s in result.steps if s.step_id == "refactor") + assert refactor.detected is True + + @patch("scripts.grader.classify_events", side_effect=_mock_compliant_classification) + def test_no_hook_promotion_recommended(self, mock_cls, tdd_spec, compliant_trace) -> None: + result = grade(tdd_spec, compliant_trace) + assert result.recommend_hook_promotion is False + + @patch("scripts.grader.classify_events", side_effect=_mock_compliant_classification) + def test_step_evidence_not_empty(self, mock_cls, tdd_spec, compliant_trace) -> None: + result = grade(tdd_spec, compliant_trace) + for step in result.steps: + if step.detected: + assert len(step.evidence) > 0 + + +class TestGradeNoncompliant: + @patch("scripts.grader.classify_events", side_effect=_mock_noncompliant_classification) + def test_low_compliance(self, mock_cls, tdd_spec, noncompliant_trace) -> None: + result = grade(tdd_spec, noncompliant_trace) + assert result.compliance_rate < 1.0 + + @patch("scripts.grader.classify_events", side_effect=_mock_noncompliant_classification) + def test_write_test_fails_ordering(self, mock_cls, tdd_spec, noncompliant_trace) -> None: + """write_test has before_step=write_impl, but test is written AFTER impl.""" + result = grade(tdd_spec, noncompliant_trace) + write_test = next(s for s in result.steps if s.step_id == "write_test") + assert write_test.detected is False + + @patch("scripts.grader.classify_events", side_effect=_mock_noncompliant_classification) + def test_run_test_red_not_detected(self, mock_cls, tdd_spec, noncompliant_trace) -> None: + result = grade(tdd_spec, noncompliant_trace) + run_red = next(s for s in result.steps if s.step_id == "run_test_red") + assert run_red.detected is False + + @patch("scripts.grader.classify_events", side_effect=_mock_noncompliant_classification) + def test_hook_promotion_recommended(self, mock_cls, tdd_spec, noncompliant_trace) -> None: + result = grade(tdd_spec, noncompliant_trace) + assert result.recommend_hook_promotion is True + + @patch("scripts.grader.classify_events", side_effect=_mock_noncompliant_classification) + def test_failure_reasons_present(self, mock_cls, tdd_spec, noncompliant_trace) -> None: + result = grade(tdd_spec, noncompliant_trace) + failed_steps = [s for s in result.steps if not s.detected and s.step_id != "refactor"] + for step in failed_steps: + assert step.failure_reason is not None + + +class TestGradeEdgeCases: + @patch("scripts.grader.classify_events", side_effect=_mock_empty_classification) + def test_empty_trace(self, mock_cls, tdd_spec) -> None: + result = grade(tdd_spec, []) + assert result.compliance_rate == 0.0 + assert result.recommend_hook_promotion is True + + @patch("scripts.grader.classify_events", side_effect=_mock_compliant_classification) + def test_compliance_rate_is_ratio_of_required_only(self, mock_cls, tdd_spec, compliant_trace) -> None: + result = grade(tdd_spec, compliant_trace) + assert result.compliance_rate == 1.0 + + @patch("scripts.grader.classify_events", side_effect=_mock_compliant_classification) + def test_spec_id_in_result(self, mock_cls, tdd_spec, compliant_trace) -> None: + result = grade(tdd_spec, compliant_trace) + assert result.spec_id == "tdd-workflow" diff --git a/skills/skill-comply/tests/test_parser.py b/skills/skill-comply/tests/test_parser.py new file mode 100644 index 00000000..2145d702 --- /dev/null +++ b/skills/skill-comply/tests/test_parser.py @@ -0,0 +1,90 @@ +"""Tests for parser module — JSONL trace and YAML spec parsing.""" + +from pathlib import Path + +import pytest + +from scripts.parser import ( + ComplianceSpec, + Detector, + ObservationEvent, + Step, + parse_spec, + parse_trace, +) + +FIXTURES = Path(__file__).parent.parent / "fixtures" + + +class TestParseTrace: + def test_parses_compliant_trace(self) -> None: + events = parse_trace(FIXTURES / "compliant_trace.jsonl") + assert len(events) == 5 + assert all(isinstance(e, ObservationEvent) for e in events) + + def test_events_sorted_by_timestamp(self) -> None: + events = parse_trace(FIXTURES / "compliant_trace.jsonl") + timestamps = [e.timestamp for e in events] + assert timestamps == sorted(timestamps) + + def test_event_fields(self) -> None: + events = parse_trace(FIXTURES / "compliant_trace.jsonl") + first = events[0] + assert first.tool == "Write" + assert first.session == "sess-001" + assert "test_fib.py" in first.input + assert first.output == "File created" + + def test_parses_noncompliant_trace(self) -> None: + events = parse_trace(FIXTURES / "noncompliant_trace.jsonl") + assert len(events) == 3 + assert "src/fib.py" in events[0].input + + def test_empty_file_returns_empty_list(self, tmp_path: Path) -> None: + empty = tmp_path / "empty.jsonl" + empty.write_text("") + events = parse_trace(empty) + assert events == [] + + def test_nonexistent_file_raises(self) -> None: + with pytest.raises(FileNotFoundError): + parse_trace(Path("/nonexistent/trace.jsonl")) + + +class TestParseSpec: + def test_parses_tdd_spec(self) -> None: + spec = parse_spec(FIXTURES / "tdd_spec.yaml") + assert isinstance(spec, ComplianceSpec) + assert spec.id == "tdd-workflow" + assert len(spec.steps) == 5 + + def test_step_fields(self) -> None: + spec = parse_spec(FIXTURES / "tdd_spec.yaml") + first = spec.steps[0] + assert isinstance(first, Step) + assert first.id == "write_test" + assert first.required is True + assert isinstance(first.detector, Detector) + assert "test file" in first.detector.description + assert first.detector.before_step == "write_impl" + + def test_optional_detector_fields(self) -> None: + spec = parse_spec(FIXTURES / "tdd_spec.yaml") + write_test = spec.steps[0] + assert write_test.detector.after_step is None + + run_test_red = spec.steps[1] + assert run_test_red.detector.after_step == "write_test" + assert run_test_red.detector.before_step == "write_impl" + + def test_scoring_threshold(self) -> None: + spec = parse_spec(FIXTURES / "tdd_spec.yaml") + assert spec.threshold_promote_to_hook == 0.6 + + def test_required_vs_optional_steps(self) -> None: + spec = parse_spec(FIXTURES / "tdd_spec.yaml") + required = [s for s in spec.steps if s.required] + optional = [s for s in spec.steps if not s.required] + assert len(required) == 4 + assert len(optional) == 1 + assert optional[0].id == "refactor"