"""Agentisch brein — LLM met tools, of regel-fallback zonder API.""" import json import os from dataclasses import dataclass from datetime import datetime from typing import List, Optional from agent.observer import ( ObservationReport, load_yaml, probe_http, probe_proxmox, probe_tcp, ) from agent.state import recent_incidents @dataclass class AgentDecision: alert: bool severity: str title: str body: str fingerprint: str actions: List[str] TOOLS_SCHEMA = [ { "type": "function", "function": { "name": "probe_tcp", "description": "Test of een TCP-poort open is op een host", "parameters": { "type": "object", "properties": { "host": {"type": "string"}, "port": {"type": "integer"}, }, "required": ["host", "port"], }, }, }, { "type": "function", "function": { "name": "probe_http", "description": "HTTP GET check op een URL", "parameters": { "type": "object", "properties": { "url": {"type": "string"}, "insecure_tls": {"type": "boolean"}, }, "required": ["url"], }, }, }, { "type": "function", "function": { "name": "probe_proxmox", "description": "Check Proxmox web UI bereikbaarheid", "parameters": { "type": "object", "properties": {"host": {"type": "string"}}, "required": ["host"], }, }, }, ] def _run_tool(name: str, args: dict) -> str: if name == "probe_tcp": ok, detail = probe_tcp(args["host"], int(args["port"])) return json.dumps({"ok": ok, "detail": detail}) if name == "probe_http": ok, detail = probe_http(args["url"], insecure=bool(args.get("insecure_tls"))) return json.dumps({"ok": ok, "detail": detail}) if name == "probe_proxmox": ok, detail = probe_proxmox(args["host"]) return json.dumps({"ok": ok, "detail": detail}) return json.dumps({"error": "unknown tool"}) def _rule_based_decide(report: ObservationReport, policies: dict) -> AgentDecision: rules = policies.get("rules") or {} failed = [f for f in report.findings if not f.ok] if not failed: return AgentDecision( alert=False, severity="info", title="Alles OK", body=f"{len(report.findings)} checks geslaagd.", fingerprint="all_ok", actions=[], ) worst = "low" lines = [] for f in failed: lines.append(f"• {f.kind}/{f.name}: {f.detail}") if f.kind == "proxmox" and not f.ok: worst = rules.get("proxmox_unreachable", "critical") elif f.kind == "nas" and not f.ok: worst = max_sev(worst, rules.get("nas_unreachable", "critical")) elif f.kind == "service" and not f.ok: worst = max_sev(worst, rules.get("any_service_down", "high")) elif f.kind == "docker" and not f.ok: worst = max_sev(worst, "high") fp = "fail:" + "|".join(sorted(f"{f.kind}:{f.name}" for f in failed))[:200] return AgentDecision( alert=True, severity=worst, title=f"{len(failed)} probleem(en) gedetecteerd", body="\n".join(lines), fingerprint=fp, actions=["Herhaal check over 5 min", "Controleer host handmatig"], ) def max_sev(a: str, b: str) -> str: order = ["info", "low", "medium", "high", "critical"] return a if order.index(a) >= order.index(b) else b def _in_quiet_hours(policies: dict) -> bool: qh = policies.get("quiet_hours") or {} if not qh: return False try: from zoneinfo import ZoneInfo tz = ZoneInfo(qh.get("timezone", "UTC")) except Exception: tz = None now = datetime.now(tz) if tz else datetime.now() start = qh.get("start", "23:00") end = qh.get("end", "07:00") h, m = map(int, now.strftime("%H %M").split()) cur = h * 60 + m sh, sm = map(int, start.split(":")) eh, em = map(int, end.split(":")) s, e = sh * 60 + sm, eh * 60 + em if s <= e: return s <= cur < e return cur >= s or cur < e return False def decide(report: ObservationReport) -> AgentDecision: policies = load_yaml("policies.yaml") history = recent_incidents(8) api_key = os.environ.get("OPENAI_API_KEY", "").strip() model = os.environ.get("AGENT_MODEL", "gpt-4o-mini") if not api_key: return _rule_based_decide(report, policies) try: from openai import OpenAI client = OpenAI(api_key=api_key) system = """Je bent de autonome security agent voor het EL-KADI homelab thuis. Je krijgt ruwe observaties (eigen probes, geen Wazuh/Uptime Kuma). Beslis of de gebruiker een Telegram-melding moet krijgen. Wees conservatief: alleen alert bij echt problemen of verdachte wijzigingen. Antwoord uiteindelijk ALTIJD met JSON: {"alert":bool,"severity":"info|low|medium|high|critical","title":"...","body":"...","fingerprint":"korte_sleutel","actions":["..."]} Je mag tools aanroepen om te verifiëren voordat je alert=true zet.""" user_msg = json.dumps( { "observations": report.to_dict(), "recent_incidents": history, "policies": { "quiet_hours": policies.get("quiet_hours"), "dedupe_minutes": policies.get("dedupe_minutes"), }, }, indent=2, default=str, ) messages = [ {"role": "system", "content": system}, {"role": "user", "content": user_msg}, ] for _ in range(5): resp = client.chat.completions.create( model=model, messages=messages, tools=TOOLS_SCHEMA, tool_choice="auto", ) msg = resp.choices[0].message if msg.tool_calls: messages.append(msg) for tc in msg.tool_calls: result = _run_tool(tc.function.name, json.loads(tc.function.arguments)) messages.append( { "role": "tool", "tool_call_id": tc.id, "content": result, } ) continue text = (msg.content or "").strip() if "```" in text: text = text.split("```")[1].replace("json", "").strip() data = json.loads(text) return AgentDecision( alert=bool(data.get("alert")), severity=str(data.get("severity", "medium")), title=str(data.get("title", "Security")), body=str(data.get("body", "")), fingerprint=str(data.get("fingerprint", "llm")), actions=list(data.get("actions") or []), ) except Exception as e: dec = _rule_based_decide(report, policies) dec.body += f"\n\n(LLM fallback: {e})" return dec return _rule_based_decide(report, policies) def should_notify(decision: AgentDecision, policies: dict) -> bool: if not decision.alert: return False allowed = policies.get("severity_telegram") or ["critical", "high"] if decision.severity not in allowed: return False if _in_quiet_hours(policies): return decision.severity == (policies.get("quiet_hours") or {}).get( "allow_severity", "critical" ) return True