Files
mo 02b1d155d4 Add home-security-agent with PostgreSQL persistence for dashboard.
The autonomous agent writes all observations to agent.* tables consumed by Homelab Command on port 8765.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 21:57:16 +02:00

244 lines
7.7 KiB
Python

"""Agentisch brein — LLM met tools, of regel-fallback zonder API."""
import json
import os
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
from agent.observer import (
ObservationReport,
load_yaml,
probe_http,
probe_proxmox,
probe_tcp,
)
from agent.state import recent_incidents
@dataclass
class AgentDecision:
alert: bool
severity: str
title: str
body: str
fingerprint: str
actions: List[str]
TOOLS_SCHEMA = [
{
"type": "function",
"function": {
"name": "probe_tcp",
"description": "Test of een TCP-poort open is op een host",
"parameters": {
"type": "object",
"properties": {
"host": {"type": "string"},
"port": {"type": "integer"},
},
"required": ["host", "port"],
},
},
},
{
"type": "function",
"function": {
"name": "probe_http",
"description": "HTTP GET check op een URL",
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string"},
"insecure_tls": {"type": "boolean"},
},
"required": ["url"],
},
},
},
{
"type": "function",
"function": {
"name": "probe_proxmox",
"description": "Check Proxmox web UI bereikbaarheid",
"parameters": {
"type": "object",
"properties": {"host": {"type": "string"}},
"required": ["host"],
},
},
},
]
def _run_tool(name: str, args: dict) -> str:
if name == "probe_tcp":
ok, detail = probe_tcp(args["host"], int(args["port"]))
return json.dumps({"ok": ok, "detail": detail})
if name == "probe_http":
ok, detail = probe_http(args["url"], insecure=bool(args.get("insecure_tls")))
return json.dumps({"ok": ok, "detail": detail})
if name == "probe_proxmox":
ok, detail = probe_proxmox(args["host"])
return json.dumps({"ok": ok, "detail": detail})
return json.dumps({"error": "unknown tool"})
def _rule_based_decide(report: ObservationReport, policies: dict) -> AgentDecision:
rules = policies.get("rules") or {}
failed = [f for f in report.findings if not f.ok]
if not failed:
return AgentDecision(
alert=False,
severity="info",
title="Alles OK",
body=f"{len(report.findings)} checks geslaagd.",
fingerprint="all_ok",
actions=[],
)
worst = "low"
lines = []
for f in failed:
lines.append(f"{f.kind}/{f.name}: {f.detail}")
if f.kind == "proxmox" and not f.ok:
worst = rules.get("proxmox_unreachable", "critical")
elif f.kind == "nas" and not f.ok:
worst = max_sev(worst, rules.get("nas_unreachable", "critical"))
elif f.kind == "service" and not f.ok:
worst = max_sev(worst, rules.get("any_service_down", "high"))
elif f.kind == "docker" and not f.ok:
worst = max_sev(worst, "high")
fp = "fail:" + "|".join(sorted(f"{f.kind}:{f.name}" for f in failed))[:200]
return AgentDecision(
alert=True,
severity=worst,
title=f"{len(failed)} probleem(en) gedetecteerd",
body="\n".join(lines),
fingerprint=fp,
actions=["Herhaal check over 5 min", "Controleer host handmatig"],
)
def max_sev(a: str, b: str) -> str:
order = ["info", "low", "medium", "high", "critical"]
return a if order.index(a) >= order.index(b) else b
def _in_quiet_hours(policies: dict) -> bool:
qh = policies.get("quiet_hours") or {}
if not qh:
return False
try:
from zoneinfo import ZoneInfo
tz = ZoneInfo(qh.get("timezone", "UTC"))
except Exception:
tz = None
now = datetime.now(tz) if tz else datetime.now()
start = qh.get("start", "23:00")
end = qh.get("end", "07:00")
h, m = map(int, now.strftime("%H %M").split())
cur = h * 60 + m
sh, sm = map(int, start.split(":"))
eh, em = map(int, end.split(":"))
s, e = sh * 60 + sm, eh * 60 + em
if s <= e:
return s <= cur < e
return cur >= s or cur < e
return False
def decide(report: ObservationReport) -> AgentDecision:
policies = load_yaml("policies.yaml")
history = recent_incidents(8)
api_key = os.environ.get("OPENAI_API_KEY", "").strip()
model = os.environ.get("AGENT_MODEL", "gpt-4o-mini")
if not api_key:
return _rule_based_decide(report, policies)
try:
from openai import OpenAI
client = OpenAI(api_key=api_key)
system = """Je bent de autonome security agent voor het EL-KADI homelab thuis.
Je krijgt ruwe observaties (eigen probes, geen Wazuh/Uptime Kuma).
Beslis of de gebruiker een Telegram-melding moet krijgen.
Wees conservatief: alleen alert bij echt problemen of verdachte wijzigingen.
Antwoord uiteindelijk ALTIJD met JSON:
{"alert":bool,"severity":"info|low|medium|high|critical","title":"...","body":"...","fingerprint":"korte_sleutel","actions":["..."]}
Je mag tools aanroepen om te verifiëren voordat je alert=true zet."""
user_msg = json.dumps(
{
"observations": report.to_dict(),
"recent_incidents": history,
"policies": {
"quiet_hours": policies.get("quiet_hours"),
"dedupe_minutes": policies.get("dedupe_minutes"),
},
},
indent=2,
default=str,
)
messages = [
{"role": "system", "content": system},
{"role": "user", "content": user_msg},
]
for _ in range(5):
resp = client.chat.completions.create(
model=model,
messages=messages,
tools=TOOLS_SCHEMA,
tool_choice="auto",
)
msg = resp.choices[0].message
if msg.tool_calls:
messages.append(msg)
for tc in msg.tool_calls:
result = _run_tool(tc.function.name, json.loads(tc.function.arguments))
messages.append(
{
"role": "tool",
"tool_call_id": tc.id,
"content": result,
}
)
continue
text = (msg.content or "").strip()
if "```" in text:
text = text.split("```")[1].replace("json", "").strip()
data = json.loads(text)
return AgentDecision(
alert=bool(data.get("alert")),
severity=str(data.get("severity", "medium")),
title=str(data.get("title", "Security")),
body=str(data.get("body", "")),
fingerprint=str(data.get("fingerprint", "llm")),
actions=list(data.get("actions") or []),
)
except Exception as e:
dec = _rule_based_decide(report, policies)
dec.body += f"\n\n(LLM fallback: {e})"
return dec
return _rule_based_decide(report, policies)
def should_notify(decision: AgentDecision, policies: dict) -> bool:
if not decision.alert:
return False
allowed = policies.get("severity_telegram") or ["critical", "high"]
if decision.severity not in allowed:
return False
if _in_quiet_hours(policies):
return decision.severity == (policies.get("quiet_hours") or {}).get(
"allow_severity", "critical"
)
return True