security: harden advisory intake and dependency coverage

This commit is contained in:
ECC Test
2026-06-09 20:46:14 -04:00
parent 8ee5946712
commit 3c5bcc2b66
5 changed files with 263 additions and 49 deletions

View File

@@ -23,6 +23,9 @@ import subprocess
import sys
import re
import shutil
import ipaddress
import socket
import urllib.parse
import urllib.request
from pathlib import Path
from datetime import datetime, timedelta, timezone
@@ -181,6 +184,55 @@ def _validate_instinct_id(instinct_id: str) -> bool:
return bool(re.match(r"^[A-Za-z0-9][A-Za-z0-9._-]*$", instinct_id))
def _validate_import_url(source: str) -> str:
"""Validate remote instinct imports before opening a network connection."""
parsed = urllib.parse.urlparse(source)
if parsed.scheme != "https":
raise ValueError("remote instinct imports require https URLs")
if not parsed.hostname:
raise ValueError("remote import URL is missing a hostname")
try:
addr_infos = socket.getaddrinfo(parsed.hostname, parsed.port or 443, type=socket.SOCK_STREAM)
except socket.gaierror as exc:
raise ValueError(f"remote import host could not be resolved: {parsed.hostname}") from exc
for family, _, _, _, sockaddr in addr_infos:
host = sockaddr[0]
try:
ip = ipaddress.ip_address(host)
except ValueError:
continue
if (
ip.is_private
or ip.is_loopback
or ip.is_link_local
or ip.is_multicast
or ip.is_reserved
or ip.is_unspecified
):
raise ValueError(f"remote import host resolves to a non-public address: {host}")
return urllib.parse.urlunparse(parsed)
def _fetch_import_url(source: str, *, max_bytes: int = 2 * 1024 * 1024) -> str:
"""Fetch a validated remote instinct file with bounded size and timeout."""
url = _validate_import_url(source)
req = urllib.request.Request(url, headers={"User-Agent": "ECC-instinct-import/2"})
with urllib.request.urlopen(req, timeout=15) as response:
content_type = response.headers.get("Content-Type", "")
if content_type and not any(
allowed in content_type.lower()
for allowed in ("text/", "markdown", "yaml", "json", "octet-stream")
):
raise ValueError(f"unsupported remote content type: {content_type}")
data = response.read(max_bytes + 1)
if len(data) > max_bytes:
raise ValueError(f"remote import exceeds {max_bytes} bytes")
return data.decode("utf-8")
def _yaml_quote(value: str) -> str:
"""Quote a string for safe YAML frontmatter serialization.
@@ -794,8 +846,7 @@ def cmd_import(args) -> int:
if source.startswith('http://') or source.startswith('https://'):
print(f"Fetching from URL: {source}")
try:
with urllib.request.urlopen(source) as response:
content = response.read().decode('utf-8')
content = _fetch_import_url(source)
except Exception as e:
print(f"Error fetching URL: {e}", file=sys.stderr)
return 1