"""Directe observatie — geen Wazuh/Uptime Kuma/n8n.""" import socket import subprocess from dataclasses import dataclass, field from datetime import datetime from typing import Any import httpx import yaml CONFIG_DIR = __import__("pathlib").Path(__file__).resolve().parent.parent / "config" @dataclass class Finding: kind: str name: str ok: bool detail: str meta: dict = field(default_factory=dict) @dataclass class ObservationReport: timestamp: str findings: list[Finding] def to_dict(self) -> dict: return { "timestamp": self.timestamp, "findings": [ {"kind": f.kind, "name": f.name, "ok": f.ok, "detail": f.detail, "meta": f.meta} for f in self.findings ], "summary": { "total": len(self.findings), "failed": sum(1 for f in self.findings if not f.ok), }, } def load_yaml(name: str) -> dict: with open(CONFIG_DIR / name, encoding="utf-8") as f: return yaml.safe_load(f) or {} def probe_tcp(host: str, port: int, timeout: float = 4) -> tuple[bool, str]: try: with socket.create_connection((host, port), timeout=timeout): return True, "open" except Exception as e: return False, str(e) def probe_http(url: str, insecure: bool = False, timeout: float = 8) -> tuple[bool, str]: try: r = httpx.get(url, timeout=timeout, verify=not insecure, follow_redirects=True) if r.status_code < 500: return True, f"HTTP {r.status_code}" return False, f"HTTP {r.status_code}" except Exception as e: return False, str(e) def probe_proxmox(host: str, port: int = 8006) -> tuple[bool, str]: ok, detail = probe_tcp(host, port) if not ok: return False, detail try: r = httpx.get(f"https://{host}:{port}/", timeout=6, verify=False) return r.status_code in (200, 301, 302, 401, 403), f"HTTPS {r.status_code}" except Exception as e: return False, str(e) def docker_container_states() -> list[Finding]: """Leest lokale Docker socket (NAS).""" findings = [] try: out = subprocess.run( ["docker", "ps", "-a", "--format", "{{.Names}}|{{.Status}}"], capture_output=True, text=True, timeout=30, ) if out.returncode != 0: return [Finding("docker", "docker", False, out.stderr.strip() or "docker ps failed")] for line in out.stdout.strip().splitlines(): if not line or "|" not in line: continue name, status = line.split("|", 1) up = status.lower().startswith("up") findings.append( Finding( "docker", name, up, status, {"exited": "exited" in status.lower()}, ) ) except FileNotFoundError: findings.append(Finding("docker", "docker", False, "docker CLI niet beschikbaar")) return findings def lan_ping_watch(subnet: str, sample_hosts: list[str] | None = None) -> list[Finding]: """Lightweight: ping gateway + optioneel lijst — geen mass scan standaard.""" findings = [] gateway = ".".join(subnet.split(".")[:3]) + ".1" ok, detail = _ping_once(gateway) findings.append(Finding("lan", "gateway", ok, detail, {"ip": gateway})) return findings def _ping_once(ip: str) -> tuple[bool, str]: try: out = subprocess.run( ["ping", "-c", "1", "-W", "2", ip], capture_output=True, text=True, timeout=5, ) return out.returncode == 0, "reachable" if out.returncode == 0 else "no reply" except Exception as e: return False, str(e) def run_observation() -> ObservationReport: targets = load_yaml("targets.yaml") findings: list[Finding] = [] nas = targets.get("nas") or {} host = nas.get("host", "192.168.1.211") for chk in nas.get("checks") or []: name = chk.get("name", "check") if chk.get("type") == "tcp": ok, detail = probe_tcp(host, int(chk["port"])) elif chk.get("type") == "http": ok, detail = probe_http(chk["url"]) else: ok, detail = False, "unknown check type" findings.append(Finding("nas", name, ok, detail)) for px in targets.get("proxmox_hosts") or []: ok, detail = probe_proxmox(px["host"], int(px.get("port", 8006))) findings.append(Finding("proxmox", px.get("name", px["host"]), ok, detail)) for svc in targets.get("services") or []: ok, detail = probe_http(svc["url"], insecure=bool(svc.get("insecure_tls"))) findings.append(Finding("service", svc.get("name", svc["url"]), ok, detail)) findings.extend(docker_container_states()) lan = targets.get("lan_watch") or {} if lan.get("enabled"): findings.extend(lan_ping_watch(lan.get("subnet", "192.168.1.0/24"))) return ObservationReport(timestamp=datetime.utcnow().isoformat() + "Z", findings=findings)