Files
homelab-configs/apps/home-security-agent/agent/main.py
T

92 lines
2.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""EL-KADI Home Security Agent — autonome loop."""
import os
import sys
import time
from pathlib import Path
# Package root on path
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
from dotenv import load_dotenv
load_dotenv(ROOT / ".env")
from agent.brain import decide, should_notify
from agent.observer import load_yaml, run_observation
from agent.pg_store import (
persist_incident as pg_record_incident,
persist_observation,
was_notified_recently_pg,
)
from agent.state import log_run, record_incident, was_notified_recently
from agent.telegram_notify import format_alert, send_message
def _record_incident(
run_id,
fingerprint: str,
severity: str,
title: str,
body: str,
notified: bool,
) -> None:
record_incident(fingerprint, severity, title, body, notified)
pg_record_incident(run_id, fingerprint, severity, title, body, notified)
def run_once() -> int:
policies = load_yaml("policies.yaml")
report = run_observation()
decision = decide(report)
log_run(decision.title, report.to_dict())
run_id = persist_observation(report, decision)
print(f"[{report.timestamp}] {decision.severity}: {decision.title} (alert={decision.alert})")
if run_id:
print(f" → PostgreSQL run #{run_id}")
if not should_notify(decision, policies):
if decision.alert:
_record_incident(run_id, decision.fingerprint, decision.severity, decision.title, decision.body, False)
return 0
dedupe = int(policies.get("dedupe_minutes", 30))
if was_notified_recently(decision.fingerprint, dedupe) or was_notified_recently_pg(
decision.fingerprint, dedupe
):
print(f" dedupe skip ({decision.fingerprint})")
return 0
text = format_alert(decision.severity, decision.title, decision.body, decision.actions)
if send_message(text):
_record_incident(run_id, decision.fingerprint, decision.severity, decision.title, decision.body, True)
print(" → Telegram verzonden")
return 0
print(" → Telegram mislukt (check TELEGRAM_BOT_TOKEN / TELEGRAM_CHAT_ID)")
_record_incident(run_id, decision.fingerprint, decision.severity, decision.title, decision.body, False)
return 1
def main():
if len(sys.argv) > 1 and sys.argv[1] == "once":
sys.exit(run_once())
policies = load_yaml("policies.yaml")
interval = int(policies.get("interval_seconds", 300))
print(f"EL-KADI Security Agent — loop elke {interval}s (Ctrl+C stop)")
while True:
try:
run_once()
except KeyboardInterrupt:
raise
except Exception as e:
print(f"run error: {e}")
time.sleep(interval)
if __name__ == "__main__":
main()