Add home-security-agent with PostgreSQL persistence for dashboard.

The autonomous agent writes all observations to agent.* tables consumed by Homelab Command on port 8765.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
mo
2026-05-17 21:57:16 +02:00
parent 43c4ed7a6d
commit 02b1d155d4
17 changed files with 1024 additions and 0 deletions
+159
View File
@@ -0,0 +1,159 @@
"""Directe observatie — geen Wazuh/Uptime Kuma/n8n."""
import socket
import subprocess
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import httpx
import yaml
CONFIG_DIR = __import__("pathlib").Path(__file__).resolve().parent.parent / "config"
@dataclass
class Finding:
kind: str
name: str
ok: bool
detail: str
meta: dict = field(default_factory=dict)
@dataclass
class ObservationReport:
timestamp: str
findings: list[Finding]
def to_dict(self) -> dict:
return {
"timestamp": self.timestamp,
"findings": [
{"kind": f.kind, "name": f.name, "ok": f.ok, "detail": f.detail, "meta": f.meta}
for f in self.findings
],
"summary": {
"total": len(self.findings),
"failed": sum(1 for f in self.findings if not f.ok),
},
}
def load_yaml(name: str) -> dict:
with open(CONFIG_DIR / name, encoding="utf-8") as f:
return yaml.safe_load(f) or {}
def probe_tcp(host: str, port: int, timeout: float = 4) -> tuple[bool, str]:
try:
with socket.create_connection((host, port), timeout=timeout):
return True, "open"
except Exception as e:
return False, str(e)
def probe_http(url: str, insecure: bool = False, timeout: float = 8) -> tuple[bool, str]:
try:
r = httpx.get(url, timeout=timeout, verify=not insecure, follow_redirects=True)
if r.status_code < 500:
return True, f"HTTP {r.status_code}"
return False, f"HTTP {r.status_code}"
except Exception as e:
return False, str(e)
def probe_proxmox(host: str, port: int = 8006) -> tuple[bool, str]:
ok, detail = probe_tcp(host, port)
if not ok:
return False, detail
try:
r = httpx.get(f"https://{host}:{port}/", timeout=6, verify=False)
return r.status_code in (200, 301, 302, 401, 403), f"HTTPS {r.status_code}"
except Exception as e:
return False, str(e)
def docker_container_states() -> list[Finding]:
"""Leest lokale Docker socket (NAS)."""
findings = []
try:
out = subprocess.run(
["docker", "ps", "-a", "--format", "{{.Names}}|{{.Status}}"],
capture_output=True,
text=True,
timeout=30,
)
if out.returncode != 0:
return [Finding("docker", "docker", False, out.stderr.strip() or "docker ps failed")]
for line in out.stdout.strip().splitlines():
if not line or "|" not in line:
continue
name, status = line.split("|", 1)
up = status.lower().startswith("up")
findings.append(
Finding(
"docker",
name,
up,
status,
{"exited": "exited" in status.lower()},
)
)
except FileNotFoundError:
findings.append(Finding("docker", "docker", False, "docker CLI niet beschikbaar"))
return findings
def lan_ping_watch(subnet: str, sample_hosts: list[str] | None = None) -> list[Finding]:
"""Lightweight: ping gateway + optioneel lijst — geen mass scan standaard."""
findings = []
gateway = ".".join(subnet.split(".")[:3]) + ".1"
ok, detail = _ping_once(gateway)
findings.append(Finding("lan", "gateway", ok, detail, {"ip": gateway}))
return findings
def _ping_once(ip: str) -> tuple[bool, str]:
try:
out = subprocess.run(
["ping", "-c", "1", "-W", "2", ip],
capture_output=True,
text=True,
timeout=5,
)
return out.returncode == 0, "reachable" if out.returncode == 0 else "no reply"
except Exception as e:
return False, str(e)
def run_observation() -> ObservationReport:
targets = load_yaml("targets.yaml")
findings: list[Finding] = []
nas = targets.get("nas") or {}
host = nas.get("host", "192.168.1.211")
for chk in nas.get("checks") or []:
name = chk.get("name", "check")
if chk.get("type") == "tcp":
ok, detail = probe_tcp(host, int(chk["port"]))
elif chk.get("type") == "http":
ok, detail = probe_http(chk["url"])
else:
ok, detail = False, "unknown check type"
findings.append(Finding("nas", name, ok, detail))
for px in targets.get("proxmox_hosts") or []:
ok, detail = probe_proxmox(px["host"], int(px.get("port", 8006)))
findings.append(Finding("proxmox", px.get("name", px["host"]), ok, detail))
for svc in targets.get("services") or []:
ok, detail = probe_http(svc["url"], insecure=bool(svc.get("insecure_tls")))
findings.append(Finding("service", svc.get("name", svc["url"]), ok, detail))
findings.extend(docker_container_states())
lan = targets.get("lan_watch") or {}
if lan.get("enabled"):
findings.extend(lan_ping_watch(lan.get("subnet", "192.168.1.0/24")))
return ObservationReport(timestamp=datetime.utcnow().isoformat() + "Z", findings=findings)