Initial commit: HA Voice Control MCP server
This commit is contained in:
@@ -0,0 +1,10 @@
|
||||
from mcp.server import Server
|
||||
s = Server("test")
|
||||
# Check for tool registration methods
|
||||
methods = [m for m in dir(s) if 'tool' in m.lower()]
|
||||
print("Tool-related methods:", methods)
|
||||
# Check type
|
||||
print("Type:", type(s))
|
||||
# Check available decorators/functions
|
||||
all_attrs = [m for m in dir(s) if not m.startswith('_')]
|
||||
print("Public attrs:", all_attrs[:30])
|
||||
@@ -0,0 +1,22 @@
|
||||
from mcp.server import Server
|
||||
from mcp.types import Tool
|
||||
import inspect
|
||||
|
||||
s = Server("test")
|
||||
|
||||
# Check if list_tools is a decorator or a method to override
|
||||
print("list_tools signature:", inspect.signature(s.list_tools))
|
||||
print("call_tool signature:", inspect.signature(s.call_tool))
|
||||
|
||||
# Check if there's a decorator pattern
|
||||
import mcp.server
|
||||
print("\nmcp.server module contents:")
|
||||
for name in dir(mcp.server):
|
||||
if not name.startswith('_'):
|
||||
print(f" {name}")
|
||||
|
||||
# Check mcp.server.lowlevel
|
||||
import mcp.server.lowlevel.server as ll
|
||||
for name in dir(ll):
|
||||
if 'tool' in name.lower():
|
||||
print(f" lowlevel: {name}")
|
||||
@@ -0,0 +1,13 @@
|
||||
from mcp.server import FastMCP
|
||||
import inspect
|
||||
|
||||
# Check FastMCP API
|
||||
print("FastMCP methods with 'tool':")
|
||||
for name in dir(FastMCP):
|
||||
if 'tool' in name.lower():
|
||||
print(f" {name}")
|
||||
|
||||
# Check if it has a tool decorator
|
||||
if hasattr(FastMCP, 'tool'):
|
||||
print("\nFastMCP.tool is a decorator!")
|
||||
print(inspect.signature(FastMCP.tool))
|
||||
@@ -0,0 +1,105 @@
|
||||
"""Database verkenning: PostgreSQL + Neo4j"""
|
||||
import os, sys
|
||||
os.environ["PG_HOST"] = "192.168.1.211"
|
||||
os.environ["PG_PORT"] = "5433"
|
||||
os.environ["PG_USER"] = "mo"
|
||||
os.environ["PG_PASSWORD"] = "WaQTUw2t"
|
||||
os.environ["PG_DATABASE"] = "homelab"
|
||||
os.environ["NEO4J_URI"] = "neo4j://192.168.1.211:49153"
|
||||
os.environ["NEO4J_USER"] = "neo4j"
|
||||
os.environ["NEO4J_PASSWORD"] = "WaQTUw2t"
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
print("=" * 70)
|
||||
print("POSTGRESQL VERKENNING")
|
||||
print("=" * 70)
|
||||
|
||||
try:
|
||||
from src.pg_client import query
|
||||
|
||||
# Alle schemas
|
||||
print("\n--- Schemas ---")
|
||||
schemas = query("SELECT schema_name FROM information_schema.schemata ORDER BY schema_name")
|
||||
for s in schemas:
|
||||
print(f" {s['schema_name']}")
|
||||
|
||||
# Alle tabellen
|
||||
print("\n--- Tabellen ---")
|
||||
tables = query("""
|
||||
SELECT table_schema, table_name
|
||||
FROM information_schema.tables
|
||||
WHERE table_type = 'BASE TABLE'
|
||||
ORDER BY table_schema, table_name
|
||||
""")
|
||||
for t in tables:
|
||||
print(f" {t['table_schema']}.{t['table_name']}")
|
||||
|
||||
# Per tabel details + sample data
|
||||
for t in tables:
|
||||
schema = t['table_schema']
|
||||
name = t['table_name']
|
||||
full = f"{schema}.{name}"
|
||||
|
||||
cols = query("""
|
||||
SELECT column_name, data_type
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = %s AND table_name = %s
|
||||
ORDER BY ordinal_position
|
||||
""", (schema, name))
|
||||
|
||||
try:
|
||||
count = query(f"SELECT count(*) as cnt FROM {full}", fetch="one")
|
||||
cnt = count['cnt']
|
||||
except Exception as e:
|
||||
cnt = f"ERR: {e}"
|
||||
|
||||
print(f"\n [{cnt} rows] {full}")
|
||||
col_names = [c['column_name'] for c in cols]
|
||||
print(f" Kolommen: {', '.join(col_names)}")
|
||||
|
||||
# Sample data (max 5 rijen)
|
||||
if isinstance(cnt, int) and cnt > 0:
|
||||
try:
|
||||
rows = query(f"SELECT * FROM {full} LIMIT 5")
|
||||
for i, row in enumerate(rows):
|
||||
print(f" Row {i+1}: {dict(row)}")
|
||||
except Exception as e:
|
||||
print(f" Sample fout: {e}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"PostgreSQL FOUT: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
print("NEO4J VERKENNING")
|
||||
print("=" * 70)
|
||||
|
||||
try:
|
||||
from src.neo4j_client import get_driver, get_all_devices, get_scan_history, get_network_summary, close
|
||||
|
||||
devices = get_all_devices()
|
||||
print(f"\nDevices: {len(devices)}")
|
||||
for d in devices:
|
||||
ports = [p.get('port') for p in (d.get('ports') or []) if p and p.get('port')]
|
||||
print(f" {d['ip']:<16} {d.get('hostname','')[:30]:<30} {d.get('os_guess','')[:25]:<25} ports={ports}")
|
||||
|
||||
print(f"\n--- Scan History ---")
|
||||
scans = get_scan_history()
|
||||
for s in scans:
|
||||
print(f" {s}")
|
||||
|
||||
print(f"\n--- Summary ---")
|
||||
summary = get_network_summary()
|
||||
print(f" {summary}")
|
||||
|
||||
close()
|
||||
print("\nNeo4j: OK")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Neo4j FOUT: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
print("\nDONE.")
|
||||
@@ -0,0 +1,94 @@
|
||||
"""Gerichte database check: dashboard data + Neo4j"""
|
||||
import os, sys, json
|
||||
os.environ["PG_HOST"] = "192.168.1.211"
|
||||
os.environ["PG_PORT"] = "5433"
|
||||
os.environ["PG_USER"] = "mo"
|
||||
os.environ["PG_PASSWORD"] = "WaQTUw2t"
|
||||
os.environ["PG_DATABASE"] = "homelab"
|
||||
os.environ["NEO4J_URI"] = "neo4j://192.168.1.211:49153"
|
||||
os.environ["NEO4J_USER"] = "neo4j"
|
||||
os.environ["NEO4J_PASSWORD"] = "WaQTUw2t"
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
# Fix encoding
|
||||
sys.stdout.reconfigure(encoding='utf-8')
|
||||
|
||||
print("=" * 70)
|
||||
print("DASHBOARD DATA (PostgreSQL)")
|
||||
print("=" * 70)
|
||||
|
||||
from src.pg_client import query
|
||||
|
||||
# Favorites
|
||||
print("\n--- FAVORITES (20 rows) ---")
|
||||
rows = query("SELECT id, title, url, icon, category, sort_order, is_pinned FROM dashboard.favorites ORDER BY is_pinned DESC, sort_order")
|
||||
for r in rows:
|
||||
pinned = "📌" if r['is_pinned'] else " "
|
||||
print(f" {pinned} [{r['id']}] {r['title']:<30} | {r['url']:<45} | {r['category']}")
|
||||
|
||||
# Settings
|
||||
print("\n--- SETTINGS (5 rows) ---")
|
||||
rows = query("SELECT * FROM dashboard.settings ORDER BY key")
|
||||
for r in rows:
|
||||
print(f" {r['key']:<25} = {r['value']}")
|
||||
|
||||
# Passwords
|
||||
print("\n--- PASSWORDS ---")
|
||||
rows = query("SELECT count(*) as c FROM dashboard.passwords", fetch="one")
|
||||
print(f" Count: {rows['c']}")
|
||||
|
||||
# Calendar
|
||||
print("\n--- CALENDAR ---")
|
||||
rows = query("SELECT count(*) as c FROM dashboard.calendar_events", fetch="one")
|
||||
print(f" Count: {rows['c']}")
|
||||
|
||||
# Files
|
||||
print("\n--- FILE INDEX ---")
|
||||
rows = query("SELECT count(*) as c FROM dashboard.file_index", fetch="one")
|
||||
print(f" Count: {rows['c']}")
|
||||
|
||||
# Photos
|
||||
print("\n--- PHOTOS ---")
|
||||
rows = query("SELECT count(*) as c FROM dashboard.photos", fetch="one")
|
||||
print(f" Count: {rows['c']}")
|
||||
|
||||
# Widgets
|
||||
print("\n--- WIDGETS ---")
|
||||
rows = query("SELECT count(*) as c FROM dashboard.dashboard_widgets", fetch="one")
|
||||
print(f" Count: {rows['c']}")
|
||||
|
||||
# ── NEO4J ──────────────────────────────────────────────────────
|
||||
print("\n" + "=" * 70)
|
||||
print("NEO4J NETWERK DATA")
|
||||
print("=" * 70)
|
||||
|
||||
from src.neo4j_client import get_driver, get_all_devices, get_scan_history, get_network_summary, close
|
||||
|
||||
try:
|
||||
devices = get_all_devices()
|
||||
print(f"\nDevices: {len(devices)}")
|
||||
for d in devices:
|
||||
ports = sorted([p.get('port') for p in (d.get('ports') or []) if p and p.get('port')])
|
||||
print(f" {d['ip']:<16} {d.get('hostname','')[:35]:<35} {d.get('os_guess','')[:28]:<28} ports={ports}")
|
||||
|
||||
print("\n--- Scan History ---")
|
||||
for s in get_scan_history():
|
||||
print(f" {s['id']} @ {s.get('timestamp','?')} — {s.get('hosts_active','?')} hosts")
|
||||
|
||||
print("\n--- Summary ---")
|
||||
s = get_network_summary()
|
||||
print(f" Total devices: {s.get('total_devices','?')}")
|
||||
print(f" Unique ports: {s.get('total_ports','?')}")
|
||||
os_types = s.get('os_types', [])
|
||||
from collections import Counter
|
||||
for os_t, cnt in Counter(os_types).most_common():
|
||||
print(f" {os_t}: {cnt}")
|
||||
|
||||
close()
|
||||
except Exception as e:
|
||||
print(f"Neo4j ERROR: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
print("\nDONE.")
|
||||
@@ -0,0 +1,29 @@
|
||||
"""Verwijder duplicate favorites (ID 11-20)."""
|
||||
import os, sys
|
||||
os.environ["PG_HOST"] = "192.168.1.211"
|
||||
os.environ["PG_PORT"] = "5433"
|
||||
os.environ["PG_USER"] = "mo"
|
||||
os.environ["PG_PASSWORD"] = "WaQTUw2t"
|
||||
os.environ["PG_DATABASE"] = "homelab"
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from src.pg_client import query, execute
|
||||
|
||||
print("Voor opschonen:")
|
||||
rows = query("SELECT id, title FROM dashboard.favorites ORDER BY id")
|
||||
for r in rows:
|
||||
print(f" [{r['id']}] {r['title']}")
|
||||
|
||||
# Verwijder IDs 11-20
|
||||
deleted = execute("DELETE FROM dashboard.favorites WHERE id >= 11 AND id <= 20")
|
||||
print(f"\n{deleted} rows verwijderd.")
|
||||
|
||||
# Reset sequence
|
||||
execute("SELECT setval('dashboard.favorites_id_seq', (SELECT max(id) FROM dashboard.favorites))")
|
||||
|
||||
print("\nNa opschonen:")
|
||||
rows = query("SELECT id, title FROM dashboard.favorites ORDER BY id")
|
||||
for r in rows:
|
||||
print(f" [{r['id']}] {r['title']}")
|
||||
|
||||
print("\nDone.")
|
||||
@@ -0,0 +1,58 @@
|
||||
"""Quick smoke test voor imports."""
|
||||
import os, sys
|
||||
os.environ["PG_HOST"] = "192.168.1.211"
|
||||
os.environ["PG_PORT"] = "5433"
|
||||
os.environ["PG_USER"] = "mo"
|
||||
os.environ["PG_PASSWORD"] = "WaQTUw2t"
|
||||
os.environ["PG_DATABASE"] = "homelab"
|
||||
os.environ["NEO4J_URI"] = "neo4j://192.168.1.211:49153"
|
||||
os.environ["NEO4J_USER"] = "neo4j"
|
||||
os.environ["NEO4J_PASSWORD"] = "WaQTUw2t"
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
print("Testing imports...")
|
||||
try:
|
||||
from src.web_server import app
|
||||
print(" web_server: OK")
|
||||
except Exception as e:
|
||||
print(f" web_server: FAIL - {e}")
|
||||
|
||||
try:
|
||||
from src.dashboard_api import router
|
||||
print(" dashboard_api: OK")
|
||||
except Exception as e:
|
||||
print(f" dashboard_api: FAIL - {e}")
|
||||
|
||||
try:
|
||||
from src.mcp_server import server
|
||||
print(" mcp_server: OK")
|
||||
except Exception as e:
|
||||
print(f" mcp_server: FAIL - {e}")
|
||||
|
||||
try:
|
||||
from src.ha_client import HAClient
|
||||
print(" ha_client: OK")
|
||||
except Exception as e:
|
||||
print(f" ha_client: FAIL - {e}")
|
||||
|
||||
try:
|
||||
from src.whisper_client import transcribe
|
||||
print(" whisper_client: OK")
|
||||
except Exception as e:
|
||||
print(f" whisper_client: FAIL - {e}")
|
||||
|
||||
try:
|
||||
from src.pg_client import query
|
||||
print(" pg_client: OK")
|
||||
except Exception as e:
|
||||
print(f" pg_client: FAIL - {e}")
|
||||
|
||||
try:
|
||||
from src.neo4j_client import get_all_devices, close
|
||||
devices = get_all_devices()
|
||||
print(f" neo4j_client: OK ({len(devices)} devices)")
|
||||
close()
|
||||
except Exception as e:
|
||||
print(f" neo4j_client: FAIL - {e}")
|
||||
|
||||
print("\nAll imports verified.")
|
||||
@@ -0,0 +1,25 @@
|
||||
import urllib.request, json
|
||||
def get(path):
|
||||
r = urllib.request.urlopen(f'http://127.0.0.1:8765{path}', timeout=5)
|
||||
return json.loads(r.read())
|
||||
|
||||
print('=== Health ===')
|
||||
print(get('/api/health'))
|
||||
|
||||
print('\n=== Favorites (raw) ===')
|
||||
r = get('/api/dashboard/favorites')
|
||||
print(f'Type: {type(r).__name__}, len: {len(r) if isinstance(r, list) else "N/A"}')
|
||||
if isinstance(r, list) and r:
|
||||
print(f'First item type: {type(r[0]).__name__}')
|
||||
print(f'First item: {r[0]}')
|
||||
elif isinstance(r, dict):
|
||||
print(f'Keys: {list(r.keys())}')
|
||||
if 'detail' in r: print(f'Error: {r}')
|
||||
|
||||
print('\n=== Overview ===')
|
||||
r = get('/api/dashboard/overview')
|
||||
print(json.dumps({k:v for k,v in r.items() if not isinstance(v, list)}, indent=2))
|
||||
|
||||
print('\n=== Dashboard HTML ===')
|
||||
r2 = urllib.request.urlopen('http://127.0.0.1:8765/dashboard', timeout=5)
|
||||
print(f'Status: {r2.status}, {len(r2.read())} bytes')
|
||||
@@ -0,0 +1,32 @@
|
||||
"""Quick test: pg_client + dashboard API."""
|
||||
import os, sys
|
||||
os.environ["NEO4J_PASSWORD"] = "WaQTUw2t"
|
||||
os.environ["PG_PASSWORD"] = "WaQTUw2t"
|
||||
|
||||
# Test PostgreSQL connectie
|
||||
from src.pg_client import query
|
||||
print("PostgreSQL test:")
|
||||
try:
|
||||
r = query("SELECT count(*) as cnt FROM dashboard.favorites", fetch="one")
|
||||
print(f" Favorites: {r['cnt']}")
|
||||
r2 = query("SELECT count(*) as cnt FROM dashboard.calendar_events", fetch="one")
|
||||
print(f" Events: {r2['cnt']}")
|
||||
r3 = query("SELECT count(*) as cnt FROM dashboard.passwords", fetch="one")
|
||||
print(f" Passwords: {r3['cnt']}")
|
||||
print(" [OK] PostgreSQL werkt!")
|
||||
except Exception as e:
|
||||
print(f" [FOUT] {e}")
|
||||
|
||||
# Test Neo4j connectie
|
||||
print("\nNeo4j test:")
|
||||
try:
|
||||
from src.neo4j_client import get_driver, get_all_devices, close
|
||||
devices = get_all_devices()
|
||||
print(f" Devices: {len(devices)}")
|
||||
print(f" Eerste: {devices[0]['ip']} - {devices[0]['hostname']}")
|
||||
print(" [OK] Neo4j werkt!")
|
||||
close()
|
||||
except Exception as e:
|
||||
print(f" [FOUT] {e}")
|
||||
|
||||
print("\nAlle connecties werken. Server kan starten.")
|
||||
@@ -0,0 +1,53 @@
|
||||
"""Test alle dashboard API endpoints."""
|
||||
import urllib.request, json
|
||||
|
||||
BASE = "http://127.0.0.1:8765"
|
||||
|
||||
def get(path):
|
||||
try:
|
||||
r = urllib.request.urlopen(f"{BASE}{path}", timeout=10)
|
||||
return json.loads(r.read())
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
print("=== Health ===")
|
||||
print(get("/api/health"))
|
||||
|
||||
print("\n=== Dashboard Overview ===")
|
||||
o = get("/api/dashboard/overview")
|
||||
print(f"Favorites: {o.get('favorites_count')}, Passwords: {o.get('password_count')}, Network: {o.get('network_devices')}")
|
||||
|
||||
print("\n=== Dashboard Config ===")
|
||||
c = get("/api/dashboard/config")
|
||||
print(f"Settings: {list(c.get('settings',{}).keys())}")
|
||||
print(f"Connections: {list(c.get('connections',{}).keys())}")
|
||||
print(f"Counts: {c.get('counts')}")
|
||||
print(f"Neo4j available: {c.get('connections',{}).get('neo4j',{}).get('available')}")
|
||||
|
||||
print("\n=== Dashboard Network ===")
|
||||
n = get("/api/dashboard/network")
|
||||
if "error" in n:
|
||||
print(f"ERROR: {n['error']}")
|
||||
else:
|
||||
print(f"Devices: {n['summary']['total_devices']}")
|
||||
print(f"Ports: {n['summary']['total_ports']}")
|
||||
print(f"OS categories: {list(n['summary']['os_categories'].keys())[:5]}")
|
||||
|
||||
print("\n=== Dashboard Systems ===")
|
||||
s = get("/api/dashboard/systems")
|
||||
if isinstance(s, list):
|
||||
print(f"Systems: {len(s)}")
|
||||
if s:
|
||||
print(f"First: {s[0]['ip']} - {s[0]['hostname']}")
|
||||
else:
|
||||
print(f"Error: {s}")
|
||||
|
||||
print("\n=== System Detail (192.168.1.211) ===")
|
||||
d = get("/api/dashboard/systems/192.168.1.211")
|
||||
print(f"IP: {d.get('ip')}, OS: {d.get('os_guess')}, Ports: {len(d.get('ports',[]))}")
|
||||
|
||||
print("\n=== Favorites ===")
|
||||
f = get("/api/dashboard/favorites")
|
||||
print(f"{len(f)} favorieten")
|
||||
|
||||
print("\n[OK] Alle endpoints getest!")
|
||||
@@ -0,0 +1,32 @@
|
||||
import urllib.request, json
|
||||
def get(path):
|
||||
try:
|
||||
r = urllib.request.urlopen(f'http://127.0.0.1:8765{path}', timeout=5)
|
||||
return json.loads(r.read())
|
||||
except Exception as e:
|
||||
return {'error': str(e)}
|
||||
|
||||
print('=== Health ===')
|
||||
print(get('/api/health'))
|
||||
|
||||
print('\n=== Favorites ===')
|
||||
r = get('/api/dashboard/favorites')
|
||||
print(f'{len(r)} favorieten (eerste: {r[0]["title"] if r else "geen"})')
|
||||
|
||||
print('\n=== Overview ===')
|
||||
r = get('/api/dashboard/overview')
|
||||
print(f'Stats: {r["favorites_count"]} favs, {r["password_count"]} pws, {r["photo_count"]} photos')
|
||||
|
||||
print('\n=== Calendar ===')
|
||||
r = get('/api/dashboard/calendar?days=7')
|
||||
print(f'{len(r)} events komende 7 dagen')
|
||||
|
||||
print('\n=== Dashboard HTML ===')
|
||||
r2 = urllib.request.urlopen('http://127.0.0.1:8765/dashboard', timeout=5)
|
||||
print(f'Dashboard pagina: {r2.status} ({len(r2.read())} bytes)')
|
||||
|
||||
print('\n=== Homepage ===')
|
||||
r3 = urllib.request.urlopen('http://127.0.0.1:8765/', timeout=5)
|
||||
print(f'Voice control pagina: {r3.status} ({len(r3.read())} bytes)')
|
||||
|
||||
print('\n[OK] Alle endpoints werken!')
|
||||
@@ -0,0 +1,17 @@
|
||||
"""Test MCP server import with FastMCP."""
|
||||
import os, sys
|
||||
os.environ["PG_HOST"] = "192.168.1.211"
|
||||
os.environ["PG_PORT"] = "5433"
|
||||
os.environ["PG_USER"] = "mo"
|
||||
os.environ["PG_PASSWORD"] = "WaQTUw2t"
|
||||
os.environ["PG_DATABASE"] = "homelab"
|
||||
os.environ["NEO4J_URI"] = "neo4j://192.168.1.211:49153"
|
||||
os.environ["NEO4J_USER"] = "neo4j"
|
||||
os.environ["NEO4J_PASSWORD"] = "WaQTUw2t"
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from src.mcp_server import server
|
||||
print(f"MCP Server OK: {server.name}")
|
||||
print(f"Tools: {len(server.list_tools())}")
|
||||
for tool in server.list_tools():
|
||||
print(f" - {tool.name}: {tool.description[:60] if tool.description else 'no desc'}...")
|
||||
@@ -0,0 +1,284 @@
|
||||
# Browser & Vaultwarden Wachtwoord Import Script
|
||||
# ==============================================
|
||||
# Dit script importeert wachtwoorden in de PostgreSQL dashboard database.
|
||||
#
|
||||
# GEBRUIK:
|
||||
# 1. Vanuit browser (Chrome/Edge/Firefox): python import_passwords.py --browser
|
||||
# 2. Vanuit Vaultwarden/Bitwarden CSV: python import_passwords.py --csv export.csv
|
||||
# 3. Vanuit JSON bestand: python import_passwords.py --json passwords.json
|
||||
#
|
||||
# JSON formaat:
|
||||
# [{"title": "Google", "url": "https://google.com", "username": "user", "password": "secret"}]
|
||||
#
|
||||
# CSV formaat (Bitwarden/Vaultwarden):
|
||||
# name,url,username,password,notes
|
||||
|
||||
import os, sys, csv, json, argparse
|
||||
|
||||
# DB config
|
||||
os.environ["PG_HOST"] = "192.168.1.211"
|
||||
os.environ["PG_PORT"] = "5433"
|
||||
os.environ["PG_USER"] = "mo"
|
||||
os.environ["PG_PASSWORD"] = "WaQTUw2t"
|
||||
os.environ["PG_DATABASE"] = "homelab"
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from src.dashboard_api import _encrypt
|
||||
from src.pg_client import execute, query
|
||||
|
||||
|
||||
def extract_chrome_passwords():
|
||||
"""Extracteer wachtwoorden uit Chrome/Edge/Brave (Windows)."""
|
||||
import sqlite3, base64
|
||||
|
||||
# Zoek alle Chromium-based browsers
|
||||
localappdata = os.environ.get("LOCALAPPDATA", "")
|
||||
browsers = {
|
||||
"Chrome": os.path.join(localappdata, "Google", "Chrome", "User Data"),
|
||||
"Edge": os.path.join(localappdata, "Microsoft", "Edge", "User Data"),
|
||||
"Brave": os.path.join(localappdata, "BraveSoftware", "Brave-Browser", "User Data"),
|
||||
"Opera": os.path.join(os.environ.get("APPDATA", ""), "Opera Software", "Opera Stable"),
|
||||
}
|
||||
|
||||
all_passwords = []
|
||||
|
||||
for name, user_data in browsers.items():
|
||||
if not os.path.exists(user_data):
|
||||
continue
|
||||
|
||||
# Check Default profile + numbered profiles
|
||||
for profile in ["Default"] + [f"Profile {i}" for i in range(1, 10)]:
|
||||
login_db = os.path.join(user_data, profile, "Login Data")
|
||||
local_state = os.path.join(user_data, "Local State")
|
||||
|
||||
if not os.path.exists(login_db) or not os.path.exists(local_state):
|
||||
continue
|
||||
|
||||
try:
|
||||
# Get encrypted key from Local State
|
||||
with open(local_state, "r", encoding="utf-8") as f:
|
||||
state = json.load(f)
|
||||
encrypted_key = base64.b64decode(
|
||||
state.get("os_crypt", {}).get("encrypted_key", "")
|
||||
)
|
||||
# Remove "DPAPI" prefix (5 bytes)
|
||||
encrypted_key = encrypted_key[5:]
|
||||
|
||||
# Decrypt key with DPAPI
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
import ctypes
|
||||
from ctypes import wintypes
|
||||
|
||||
class DATA_BLOB(ctypes.Structure):
|
||||
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_char))]
|
||||
|
||||
crypt32 = ctypes.windll.crypt32
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
|
||||
blob_in = DATA_BLOB(len(encrypted_key), ctypes.create_string_buffer(encrypted_key, len(encrypted_key)))
|
||||
blob_out = DATA_BLOB()
|
||||
|
||||
if crypt32.CryptUnprotectData(
|
||||
ctypes.byref(blob_in), None, None, None, None, 0, ctypes.byref(blob_out)
|
||||
):
|
||||
aes_key = ctypes.string_at(blob_out.pbData, blob_out.cbData)
|
||||
kernel32.LocalFree(blob_out.pbData)
|
||||
else:
|
||||
print(f" Kon DPAPI key niet decrypten voor {name}/{profile}")
|
||||
continue
|
||||
|
||||
# Read passwords
|
||||
conn = sqlite3.connect(login_db)
|
||||
cursor = conn.execute(
|
||||
"SELECT origin_url, username_value, password_value FROM logins"
|
||||
)
|
||||
|
||||
aesgcm = AESGCM(aes_key)
|
||||
count = 0
|
||||
for url, username, enc_pwd in cursor:
|
||||
if not enc_pwd:
|
||||
continue
|
||||
try:
|
||||
# Format: "v10" (3 bytes prefix) + 12 bytes nonce + ciphertext + 16 bytes tag
|
||||
nonce = enc_pwd[3:15]
|
||||
ciphertext = enc_pwd[15:-16]
|
||||
tag = enc_pwd[-16:]
|
||||
password = aesgcm.decrypt(nonce, ciphertext + tag, None).decode("utf-8")
|
||||
|
||||
title = url.split("://")[-1].split("/")[0] if url else "Onbekend"
|
||||
all_passwords.append({
|
||||
"title": title,
|
||||
"url": url,
|
||||
"username": username,
|
||||
"password": password,
|
||||
})
|
||||
count += 1
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
conn.close()
|
||||
print(f" {name}/{profile}: {count} wachtwoorden gevonden")
|
||||
|
||||
except Exception as e:
|
||||
print(f" Fout bij {name}/{profile}: {e}")
|
||||
|
||||
return all_passwords
|
||||
|
||||
|
||||
def extract_firefox_passwords():
|
||||
"""Extracteer wachtwoorden uit Firefox. Vereist dat Firefox draait of de master password bekend is."""
|
||||
import sqlite3
|
||||
|
||||
appdata = os.environ.get("APPDATA", "")
|
||||
profiles_dir = os.path.join(appdata, "Mozilla", "Firefox", "Profiles")
|
||||
|
||||
if not os.path.exists(profiles_dir):
|
||||
return []
|
||||
|
||||
# Firefox gebruikt logins.json en key4.db
|
||||
# De encryptie is complexer (NSS/PKCS11). We kunnen wel de logins.json lezen
|
||||
# en de gebruiker vragen Firefox te openen om te decrypteren.
|
||||
|
||||
all_passwords = []
|
||||
for profile in os.listdir(profiles_dir):
|
||||
logins_path = os.path.join(profiles_dir, profile, "logins.json")
|
||||
if not os.path.exists(logins_path):
|
||||
continue
|
||||
|
||||
try:
|
||||
with open(logins_path, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
|
||||
count = 0
|
||||
for entry in data.get("logins", []):
|
||||
all_passwords.append({
|
||||
"title": entry.get("hostname", "Onbekend"),
|
||||
"url": entry.get("hostname", ""),
|
||||
"username": entry.get("encryptedUsername", "(encrypted)"),
|
||||
"password": entry.get("encryptedPassword", "(encrypted)"),
|
||||
})
|
||||
count += 1
|
||||
print(f" Firefox/{profile}: {count} logins gevonden (versleuteld)")
|
||||
except Exception as e:
|
||||
print(f" Fout bij Firefox/{profile}: {e}")
|
||||
|
||||
return all_passwords
|
||||
|
||||
|
||||
def import_to_db(passwords: list[dict], dry_run: bool = False):
|
||||
"""Importeer wachtwoorden in de PostgreSQL database."""
|
||||
if not passwords:
|
||||
print("\nGeen wachtwoorden gevonden om te importeren.")
|
||||
return
|
||||
|
||||
print(f"\nImporteren van {len(passwords)} wachtwoorden...")
|
||||
|
||||
existing = query("SELECT url, username FROM passwords")
|
||||
existing_pairs = {(r["url"], r["username"]) for r in existing}
|
||||
|
||||
imported = 0
|
||||
skipped = 0
|
||||
|
||||
for pw in passwords:
|
||||
if (pw.get("url"), pw.get("username")) in existing_pairs:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
if dry_run:
|
||||
print(f" [DRY RUN] {pw['title']}: {pw['username']} @ {pw['url']}")
|
||||
imported += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
encrypted = _encrypt(pw["password"])
|
||||
execute(
|
||||
"""INSERT INTO passwords (title, url, username, password_encrypted, notes, category)
|
||||
VALUES (%s, %s, %s, %s, %s, %s)""",
|
||||
(pw["title"], pw.get("url", ""), pw.get("username", ""),
|
||||
encrypted, pw.get("notes", ""), pw.get("category", "Algemeen"))
|
||||
)
|
||||
imported += 1
|
||||
except Exception as e:
|
||||
print(f" Fout bij {pw['title']}: {e}")
|
||||
|
||||
print(f"\nResultaat: {imported} geimporteerd, {skipped} overgeslagen (bestond al)")
|
||||
|
||||
# Toon totaal
|
||||
total = query("SELECT count(*) as c FROM passwords", fetch="one")
|
||||
print(f"Totaal in database: {total['c']} wachtwoorden")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Importeer wachtwoorden in PostgreSQL")
|
||||
parser.add_argument("--browser", action="store_true", help="Extract uit lokale browsers")
|
||||
parser.add_argument("--csv", type=str, help="Importeer uit Bitwarden/Vaultwarden CSV export")
|
||||
parser.add_argument("--json", type=str, help="Importeer uit JSON bestand")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Toon wat geimporteerd zou worden")
|
||||
args = parser.parse_args()
|
||||
|
||||
passwords = []
|
||||
|
||||
if args.browser:
|
||||
print("=== Extractie uit browsers ===\n")
|
||||
passwords.extend(extract_chrome_passwords())
|
||||
passwords.extend(extract_firefox_passwords())
|
||||
|
||||
if not passwords:
|
||||
print("\nGeen browser wachtwoorden gevonden!")
|
||||
print("Je kunt je wachtwoorden exporteren uit Vaultwarden/Bitwarden:")
|
||||
print(" 1. Open http://192.168.1.6:8000")
|
||||
print(" 2. Ga naar Tools -> Export Vault")
|
||||
print(" 3. Kies .csv of .json formaat")
|
||||
print(f" 4. python {sys.argv[0]} --csv export.csv")
|
||||
return
|
||||
|
||||
elif args.csv:
|
||||
print(f"=== Import uit CSV: {args.csv} ===\n")
|
||||
with open(args.csv, "r", encoding="utf-8-sig") as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
passwords.append({
|
||||
"title": row.get("name", row.get("title", "Onbekend")),
|
||||
"url": row.get("login_uri", row.get("url", "")),
|
||||
"username": row.get("login_username", row.get("username", "")),
|
||||
"password": row.get("login_password", row.get("password", "")),
|
||||
"notes": row.get("notes", ""),
|
||||
})
|
||||
print(f" {len(passwords)} wachtwoorden uit CSV gelezen")
|
||||
|
||||
elif args.json:
|
||||
print(f"=== Import uit JSON: {args.json} ===\n")
|
||||
with open(args.json, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
|
||||
if isinstance(data, dict) and "items" in data:
|
||||
# Bitwarden/Vaultwarden JSON export formaat
|
||||
for item in data.get("items", []):
|
||||
if item.get("type") == 1: # Login type
|
||||
login = item.get("login", {})
|
||||
passwords.append({
|
||||
"title": item.get("name", "Onbekend"),
|
||||
"url": login.get("uris", [{}])[0].get("uri", "") if login.get("uris") else "",
|
||||
"username": login.get("username", ""),
|
||||
"password": login.get("password", ""),
|
||||
"notes": item.get("notes", ""),
|
||||
})
|
||||
else:
|
||||
passwords = data
|
||||
|
||||
print(f" {len(passwords)} wachtwoorden uit JSON gelezen")
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
print("\nVoorbeelden:")
|
||||
print(" python import_passwords.py --browser (Chrome/Edge/Firefox)")
|
||||
print(" python import_passwords.py --csv bitwarden.csv (Vaultwarden export)")
|
||||
print(" python import_passwords.py --json vault.json (JSON export)")
|
||||
print(" python import_passwords.py --browser --dry-run (test-modus)")
|
||||
return
|
||||
|
||||
import_to_db(passwords, dry_run=args.dry_run)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,82 @@
|
||||
"""
|
||||
Importeer scanresultaten naar Neo4j.
|
||||
|
||||
Gebruik:
|
||||
python import_to_neo4j.py
|
||||
python import_to_neo4j.py --password jouw_wachtwoord
|
||||
"""
|
||||
|
||||
import sys
|
||||
import logging
|
||||
import os
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def main():
|
||||
# Password via CLI argument of env var
|
||||
password = os.getenv("NEO4J_PASSWORD", "")
|
||||
if "--password" in sys.argv:
|
||||
idx = sys.argv.index("--password")
|
||||
if idx + 1 < len(sys.argv):
|
||||
password = sys.argv[idx + 1]
|
||||
if not password:
|
||||
print("Geef Neo4j wachtwoord mee: NEO4J_PASSWORD=... python import_to_neo4j.py")
|
||||
print("Of: python import_to_neo4j.py --password jouw_wachtwoord")
|
||||
sys.exit(1)
|
||||
|
||||
os.environ["NEO4J_PASSWORD"] = password
|
||||
|
||||
from src.neo4j_client import init_schema, import_scan, get_all_devices, get_network_summary, close
|
||||
from src.scan_data import SCAN_RESULTS
|
||||
|
||||
print("=" * 60)
|
||||
print("Neo4j Thuisnetwerk Import")
|
||||
print("=" * 60)
|
||||
|
||||
# Init
|
||||
try:
|
||||
init_schema()
|
||||
except Exception as e:
|
||||
logger.error("Kan geen verbinding maken met Neo4j: %s", e)
|
||||
logger.error("Check of Neo4j draait op neo4j://192.168.1.211:49153")
|
||||
sys.exit(1)
|
||||
|
||||
# Import
|
||||
print(f"\nImporteren van {len(SCAN_RESULTS)} devices...")
|
||||
try:
|
||||
summary = import_scan(SCAN_RESULTS)
|
||||
print(f"\n{summary}")
|
||||
except Exception as e:
|
||||
logger.error("Import fout: %s", e)
|
||||
close()
|
||||
sys.exit(1)
|
||||
|
||||
# Toon resultaten
|
||||
print("\n" + "-" * 60)
|
||||
network_summary = get_network_summary()
|
||||
print(f"Database: {network_summary['total_devices']} devices, "
|
||||
f"{network_summary['total_ports']} unieke poorten")
|
||||
|
||||
devices = get_all_devices()
|
||||
print(f"\n{'IP':<16} {'Naam':<32} {'OS':<28} {'Poorten'}")
|
||||
print("-" * 90)
|
||||
for d in devices:
|
||||
ports = d.get("ports") or []
|
||||
port_str = ", ".join(str(p["port"]) for p in ports if p and p.get("port"))
|
||||
print(f"{d['ip']:<16} {d['hostname'][:31]:<32} {d['os_guess'][:27]:<28} {port_str}")
|
||||
|
||||
# Categorieen
|
||||
print(f"\n--- Categorieen ---")
|
||||
from collections import Counter
|
||||
cats = Counter(d["os_guess"] for d in devices)
|
||||
for cat, count in cats.most_common():
|
||||
print(f" {cat:<40} {count}x")
|
||||
|
||||
close()
|
||||
print("\nKlaar! Je netwerk staat nu in Neo4j.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,208 @@
|
||||
"""
|
||||
Netwerkscanner v3 — robuust op Windows.
|
||||
Fase 1: ICMP host-discovery in kleine batches.
|
||||
Fase 2: TCP poortscan op actieve hosts.
|
||||
"""
|
||||
|
||||
import socket
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
import ipaddress
|
||||
import subprocess
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
NETWORK = "192.168.1.0/24"
|
||||
TIMEOUT = 0.5
|
||||
MAX_THREADS = 25 # lager voor Windows-stabiliteit
|
||||
|
||||
TOP_PORTS = [
|
||||
21, 22, 23, 25, 53, 80, 81, 88, 110, 111, 135, 139, 143, 161, 389,
|
||||
443, 445, 465, 500, 514, 515, 548, 554, 587, 631, 636, 873, 993, 995,
|
||||
1433, 1521, 1723, 1880, 1883, 2049, 2082, 2083, 2222, 3128, 3306, 3389,
|
||||
5000, 5432, 5900, 6379, 7001, 8000, 8008, 8080, 8081, 8123, 8443, 8888,
|
||||
9000, 9090, 9200, 9443, 10000, 27017,
|
||||
]
|
||||
|
||||
PROBES = {
|
||||
22: b'', 25: b'', 80: b'GET / HTTP/1.0\r\nHost: scan\r\n\r\n',
|
||||
443: b'GET / HTTP/1.0\r\nHost: scan\r\n\r\n',
|
||||
3306: b'', 5432: b'', 6379: b'PING\r\n',
|
||||
8080: b'GET / HTTP/1.0\r\nHost: scan\r\n\r\n',
|
||||
8123: b'GET /api/ HTTP/1.0\r\nHost: scan\r\n\r\n',
|
||||
}
|
||||
|
||||
|
||||
def ping_host(ip):
|
||||
"""ICMP ping, retourneert (ip, True/False)."""
|
||||
try:
|
||||
cmd = ["ping", "-n", "1", "-w", "1500", ip]
|
||||
r = subprocess.run(cmd, capture_output=True, timeout=3)
|
||||
return (ip, r.returncode == 0)
|
||||
except Exception:
|
||||
return (ip, False)
|
||||
|
||||
|
||||
def tcp_scan(ip, ports):
|
||||
"""Scan TCP-poorten op één host. Retourneert dict met resultaten."""
|
||||
open_ports = []
|
||||
banners = {}
|
||||
for port in ports:
|
||||
try:
|
||||
s = socket.create_connection((ip, port), timeout=TIMEOUT)
|
||||
# Banner grab
|
||||
probe = PROBES.get(port)
|
||||
if probe:
|
||||
try:
|
||||
s.sendall(probe)
|
||||
s.settimeout(0.5)
|
||||
b = s.recv(512)
|
||||
text = b.decode("latin-1", errors="replace").split("\r\n")[0].strip()
|
||||
banners[port] = text[:100]
|
||||
except Exception:
|
||||
pass
|
||||
s.close()
|
||||
open_ports.append(port)
|
||||
except Exception:
|
||||
pass
|
||||
return {"ip": ip, "open_ports": open_ports, "banners": banners}
|
||||
|
||||
|
||||
def guess_os(ports, banners):
|
||||
p = set(ports)
|
||||
if 8123 in p:
|
||||
b = banners.get(8123, "")
|
||||
return "Home Assistant" if "Home Assistant" in b else "Home Assistant/Linux"
|
||||
if 445 in p and 3389 in p: return "Windows (SMB+RDP)"
|
||||
if 445 in p and 135 in p: return "Windows (SMB+RPC)"
|
||||
if 3389 in p: return "Windows (RDP)"
|
||||
if 22 in p and 80 in p and 443 in p: return "Linux (SSH+Web)"
|
||||
if 22 in p and 5432 in p: return "Linux (PostgreSQL)"
|
||||
if 22 in p and 3306 in p: return "Linux (MySQL)"
|
||||
if 22 in p: return "Linux (SSH)"
|
||||
if 53 in p and 80 in p: return "Router/Gateway (DNS+Web)"
|
||||
if 53 in p: return "Router/Gateway (DNS)"
|
||||
if 1883 in p: return "MQTT Broker (IoT)"
|
||||
if 80 in p or 443 in p: return "Webserver"
|
||||
return "Onbekend"
|
||||
|
||||
|
||||
def resolve_hostname(ip):
|
||||
try:
|
||||
return socket.gethostbyaddr(ip)[0]
|
||||
except Exception:
|
||||
return ip
|
||||
|
||||
|
||||
def get_mac(ip):
|
||||
try:
|
||||
r = subprocess.run(["arp", "-a", ip], capture_output=True, text=True, timeout=2)
|
||||
for line in r.stdout.splitlines():
|
||||
if ip in line:
|
||||
for part in line.split():
|
||||
if "-" in part and len(part) == 17:
|
||||
return part.replace("-", ":").upper()
|
||||
except Exception:
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
# ── main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def main():
|
||||
net = ipaddress.ip_network(NETWORK, strict=False)
|
||||
all_hosts = [str(h) for h in net.hosts()]
|
||||
total = len(all_hosts)
|
||||
|
||||
print(f"Netwerk scan: {NETWORK}")
|
||||
print(f"Hosts: {total} | Poorten: {len(TOP_PORTS)} | Threads: {MAX_THREADS}")
|
||||
print("=" * 70)
|
||||
|
||||
t0 = time.time()
|
||||
|
||||
# ── Fase 1: Host discovery via ICMP ping ──────────────────────────────
|
||||
print("\n[Fase 1] ICMP host discovery...")
|
||||
alive = []
|
||||
with ThreadPoolExecutor(max_workers=MAX_THREADS) as ex:
|
||||
futs = [ex.submit(ping_host, ip) for ip in all_hosts]
|
||||
for i, fut in enumerate(as_completed(futs), 1):
|
||||
ip, ok = fut.result()
|
||||
if ok:
|
||||
alive.append(ip)
|
||||
if i % 20 == 0 or i == total:
|
||||
print(f"\r {i}/{total} getest, {len(alive)} actief", end="", flush=True)
|
||||
|
||||
print(f"\n Klaar: {len(alive)} actieve host(s) in {time.time()-t0:.0f}s")
|
||||
|
||||
if not alive:
|
||||
print("\nGeen actieve hosts gevonden.")
|
||||
return
|
||||
|
||||
# ── Fase 2: TCP-poortscan op actieve hosts ────────────────────────────
|
||||
print(f"\n[Fase 2] TCP poortscan op {len(alive)} host(s)...")
|
||||
results = []
|
||||
with ThreadPoolExecutor(max_workers=min(MAX_THREADS, len(alive))) as ex:
|
||||
futs = {ex.submit(tcp_scan, ip, TOP_PORTS): ip for ip in alive}
|
||||
for i, fut in enumerate(as_completed(futs), 1):
|
||||
ip = futs[fut]
|
||||
try:
|
||||
r = fut.result()
|
||||
r["hostname"] = resolve_hostname(ip)
|
||||
r["mac"] = get_mac(ip)
|
||||
r["os_guess"] = guess_os(r["open_ports"], r["banners"])
|
||||
results.append(r)
|
||||
except Exception as e:
|
||||
print(f"\n Fout bij {ip}: {e}")
|
||||
if i % 5 == 0 or i == len(alive):
|
||||
print(f"\r {i}/{len(alive)} hosts gescand", end="", flush=True)
|
||||
|
||||
elapsed = time.time() - t0
|
||||
print(f"\n Klaar in {elapsed:.0f}s\n")
|
||||
|
||||
# ── Output ────────────────────────────────────────────────────────────
|
||||
results.sort(key=lambda r: ipaddress.IPv4Address(r["ip"]))
|
||||
|
||||
for h in results:
|
||||
ip = h["ip"]
|
||||
ports = h["open_ports"]
|
||||
banners = h["banners"]
|
||||
|
||||
pnames = {}
|
||||
for p in ports:
|
||||
try:
|
||||
pnames[p] = socket.getservbyport(p, "tcp")
|
||||
except Exception:
|
||||
pnames[p] = "?"
|
||||
|
||||
print("-" * 70)
|
||||
print(f" IP : {ip}")
|
||||
if h["hostname"] != ip:
|
||||
print(f" Hostname : {h['hostname']}")
|
||||
if h["mac"]:
|
||||
print(f" MAC : {h['mac']}")
|
||||
print(f" OS (guess) : {h['os_guess']}")
|
||||
print(f" Open poorten ({len(ports)}):")
|
||||
if ports:
|
||||
for p in ports:
|
||||
svc = pnames.get(p, "?")
|
||||
b = banners.get(p, "")
|
||||
line = f" {p:>5}/tcp {svc:<14}"
|
||||
if b:
|
||||
line += f" [{b}]"
|
||||
print(line)
|
||||
else:
|
||||
print(" (alle geteste poorten zijn gesloten/gefilterd)")
|
||||
print()
|
||||
|
||||
# ── Samenvatting ──────────────────────────────────────────────────────
|
||||
print("=" * 70)
|
||||
print(f"{'IP':<16} {'MAC':<18} {'Hostname':<30} {'OS':<26} {'Poorten':>7}")
|
||||
print("-" * 100)
|
||||
for h in results:
|
||||
print(f"{h['ip']:<16} {h['mac']:<18} {h['hostname'][:29]:<30} {h['os_guess'][:25]:<26} {len(h['open_ports']):>7}")
|
||||
print()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user