mirror of
https://github.com/affaan-m/everything-claude-code.git
synced 2026-06-11 02:33:10 +08:00
Add opt-in AURA trust-check adapter (integrations/aura)
This commit is contained in:
200
integrations/aura/adapter.py
Normal file
200
integrations/aura/adapter.py
Normal file
@@ -0,0 +1,200 @@
|
||||
"""
|
||||
AURA trust-check adapter — a zero-dependency, read-only reputation lookup.
|
||||
|
||||
Drop this module into any agent/host project to gate a sensitive action
|
||||
(settlement, delegation, tool execution) behind a backward-looking trust
|
||||
verdict for the *counterparty* agent. It does NOT sign, hold keys, move
|
||||
funds, or touch your wallet. It makes one HTTP GET and returns a verdict.
|
||||
|
||||
Design boundary (intentional):
|
||||
- read-only: the only network call is GET /check?did=...
|
||||
- no auth: /check is a public endpoint; no API key, no secret
|
||||
- no coupling: pure stdlib (urllib). No third-party imports, no SDK.
|
||||
- fail-closed: on network failure the verdict is `unknown`, and the
|
||||
default gate (before_settle) rejects `unknown` — so an
|
||||
unreachable AURA never silently waves a counterparty
|
||||
through. Flip `fail_open=True` to invert that.
|
||||
|
||||
Public API:
|
||||
aura_verdict(did) -> AuraVerdict (never raises on network)
|
||||
before_settle(did, allow=...) -> AuraVerdict (raises AuraUntrusted)
|
||||
require_trust = before_settle (alias)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
__all__ = [
|
||||
"aura_verdict",
|
||||
"before_settle",
|
||||
"require_trust",
|
||||
"AuraVerdict",
|
||||
"AuraUntrusted",
|
||||
"DEFAULT_BASE_URL",
|
||||
"DEFAULT_ALLOW",
|
||||
]
|
||||
|
||||
DEFAULT_BASE_URL = "https://agent.auraopenprotocol.org"
|
||||
DEFAULT_TIMEOUT = 8 # seconds
|
||||
|
||||
# Verdicts safe to proceed with by default. Rejects `high_risk` (poor track
|
||||
# record) and `unknown` (no verifiable history / endpoint unreachable).
|
||||
DEFAULT_ALLOW = ("trusted", "caution", "new")
|
||||
|
||||
# All verdict classes the /check endpoint can return.
|
||||
VERDICTS = ("trusted", "caution", "high_risk", "new", "unknown")
|
||||
|
||||
|
||||
class AuraUntrusted(Exception):
|
||||
"""Raised by before_settle() when a counterparty fails the trust gate."""
|
||||
|
||||
def __init__(self, verdict: "AuraVerdict") -> None:
|
||||
self.verdict = verdict
|
||||
super().__init__(
|
||||
f"trust gate rejected {verdict.did}: {verdict.verdict} — {verdict.reason}"
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AuraVerdict:
|
||||
"""
|
||||
Result of a zero-auth trust check on a counterparty DID.
|
||||
|
||||
Fields:
|
||||
did the DID that was checked
|
||||
verdict one of trusted | caution | high_risk | new | unknown
|
||||
reason human-readable explanation
|
||||
score composite 0..1, or None when there is no history
|
||||
has_history True once the agent has on-chain interactions
|
||||
dimensions per-dimension breakdown (which axis is weak), or None
|
||||
raw the untouched JSON body, for callers that want more
|
||||
"""
|
||||
|
||||
did: str
|
||||
verdict: str
|
||||
reason: str = ""
|
||||
score: Optional[float] = None
|
||||
has_history: bool = False
|
||||
dimensions: Optional[dict[str, float]] = None
|
||||
raw: dict[str, Any] = field(default_factory=dict, repr=False)
|
||||
|
||||
@property
|
||||
def ok(self) -> bool:
|
||||
"""True for verdicts safe to proceed with (trusted / caution)."""
|
||||
return self.verdict in ("trusted", "caution")
|
||||
|
||||
def as_dict(self) -> dict[str, Any]:
|
||||
"""The minimal {verdict, reason, score} contract, plus did/ok."""
|
||||
return {
|
||||
"did": self.did,
|
||||
"verdict": self.verdict,
|
||||
"reason": self.reason,
|
||||
"score": self.score,
|
||||
"ok": self.ok,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_payload(cls, did: str, body: dict[str, Any]) -> "AuraVerdict":
|
||||
verdict = str(body.get("verdict", "unknown"))
|
||||
if verdict not in VERDICTS:
|
||||
verdict = "unknown"
|
||||
return cls(
|
||||
did=body.get("did", did),
|
||||
verdict=verdict,
|
||||
reason=str(body.get("reason", "")),
|
||||
score=body.get("score"),
|
||||
has_history=bool(body.get("has_history", False)),
|
||||
dimensions=body.get("dimensions"),
|
||||
raw=body,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def unreachable(cls, did: str, reason: str) -> "AuraVerdict":
|
||||
"""A synthetic `unknown` verdict for network/parse failures."""
|
||||
return cls(did=did, verdict="unknown", reason=reason)
|
||||
|
||||
|
||||
# Indirection point so tests can inject canned responses without a network.
|
||||
# Signature: (url: str, timeout: float) -> dict (raises on transport error)
|
||||
def _http_get_json(url: str, timeout: float) -> dict[str, Any]:
|
||||
req = urllib.request.Request(url, headers={"User-Agent": "aura-adapter/1.0"})
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310 (https only)
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
|
||||
|
||||
def aura_verdict(
|
||||
did: str,
|
||||
*,
|
||||
base_url: str = DEFAULT_BASE_URL,
|
||||
timeout: float = DEFAULT_TIMEOUT,
|
||||
_fetch: Callable[[str, float], dict[str, Any]] = _http_get_json,
|
||||
) -> AuraVerdict:
|
||||
"""
|
||||
Look up the trust verdict for a counterparty DID. Never raises on a
|
||||
network/parse failure — returns an `unknown` verdict instead, leaving the
|
||||
proceed/abort decision to the caller's policy (see before_settle).
|
||||
|
||||
v = aura_verdict("did:aura:z6Mk...")
|
||||
print(v.verdict, v.reason, v.score)
|
||||
|
||||
`_fetch` is an injection seam for tests; production callers ignore it.
|
||||
"""
|
||||
if not did or not str(did).startswith("did:"):
|
||||
raise ValueError(f"invalid DID: {did!r} (must start with 'did:')")
|
||||
|
||||
url = f"{base_url.rstrip('/')}/check?" + urllib.parse.urlencode({"did": did})
|
||||
try:
|
||||
body = _fetch(url, timeout)
|
||||
except (urllib.error.URLError, TimeoutError, OSError) as e:
|
||||
return AuraVerdict.unreachable(did, f"AURA unreachable: {e}")
|
||||
except (json.JSONDecodeError, ValueError) as e:
|
||||
return AuraVerdict.unreachable(did, f"AURA returned non-JSON: {e}")
|
||||
|
||||
if not isinstance(body, dict):
|
||||
return AuraVerdict.unreachable(did, "AURA returned an unexpected shape")
|
||||
return AuraVerdict.from_payload(did, body)
|
||||
|
||||
|
||||
def before_settle(
|
||||
did: str,
|
||||
*,
|
||||
allow: tuple[str, ...] = DEFAULT_ALLOW,
|
||||
fail_open: bool = False,
|
||||
base_url: str = DEFAULT_BASE_URL,
|
||||
timeout: float = DEFAULT_TIMEOUT,
|
||||
_fetch: Callable[[str, float], dict[str, Any]] = _http_get_json,
|
||||
) -> AuraVerdict:
|
||||
"""
|
||||
Gate a sensitive action behind a trust check. Returns the verdict on pass,
|
||||
raises AuraUntrusted on fail.
|
||||
|
||||
try:
|
||||
before_settle(counterparty_did) # rejects high_risk + unknown
|
||||
settle_payment(counterparty_did, amount)
|
||||
except AuraUntrusted as e:
|
||||
abort(str(e))
|
||||
|
||||
Tighten to reject brand-new agents too:
|
||||
before_settle(did, allow=("trusted", "caution"))
|
||||
|
||||
fail_open=True makes an *unreachable* AURA pass through (the verdict is
|
||||
`unknown` but treated as allowed). Off by default — absence of evidence is
|
||||
not evidence of trust.
|
||||
"""
|
||||
v = aura_verdict(did, base_url=base_url, timeout=timeout, _fetch=_fetch)
|
||||
|
||||
if v.verdict in allow:
|
||||
return v
|
||||
if fail_open and v.verdict == "unknown" and not v.has_history:
|
||||
return v
|
||||
raise AuraUntrusted(v)
|
||||
|
||||
|
||||
# Alias — same gate, name that reads better at non-payment call sites.
|
||||
require_trust = before_settle
|
||||
Reference in New Issue
Block a user