Files
homelab-configs/scanner.py
T

209 lines
7.3 KiB
Python
Raw Normal View History

"""
Netwerkscanner v3 — robuust op Windows.
Fase 1: ICMP host-discovery in kleine batches.
Fase 2: TCP poortscan op actieve hosts.
"""
import socket
import struct
import sys
import time
import ipaddress
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
NETWORK = "192.168.1.0/24"
TIMEOUT = 0.5
MAX_THREADS = 25 # lager voor Windows-stabiliteit
TOP_PORTS = [
21, 22, 23, 25, 53, 80, 81, 88, 110, 111, 135, 139, 143, 161, 389,
443, 445, 465, 500, 514, 515, 548, 554, 587, 631, 636, 873, 993, 995,
1433, 1521, 1723, 1880, 1883, 2049, 2082, 2083, 2222, 3128, 3306, 3389,
5000, 5432, 5900, 6379, 7001, 8000, 8008, 8080, 8081, 8123, 8443, 8888,
9000, 9090, 9200, 9443, 10000, 27017,
]
PROBES = {
22: b'', 25: b'', 80: b'GET / HTTP/1.0\r\nHost: scan\r\n\r\n',
443: b'GET / HTTP/1.0\r\nHost: scan\r\n\r\n',
3306: b'', 5432: b'', 6379: b'PING\r\n',
8080: b'GET / HTTP/1.0\r\nHost: scan\r\n\r\n',
8123: b'GET /api/ HTTP/1.0\r\nHost: scan\r\n\r\n',
}
def ping_host(ip):
"""ICMP ping, retourneert (ip, True/False)."""
try:
cmd = ["ping", "-n", "1", "-w", "1500", ip]
r = subprocess.run(cmd, capture_output=True, timeout=3)
return (ip, r.returncode == 0)
except Exception:
return (ip, False)
def tcp_scan(ip, ports):
"""Scan TCP-poorten op één host. Retourneert dict met resultaten."""
open_ports = []
banners = {}
for port in ports:
try:
s = socket.create_connection((ip, port), timeout=TIMEOUT)
# Banner grab
probe = PROBES.get(port)
if probe:
try:
s.sendall(probe)
s.settimeout(0.5)
b = s.recv(512)
text = b.decode("latin-1", errors="replace").split("\r\n")[0].strip()
banners[port] = text[:100]
except Exception:
pass
s.close()
open_ports.append(port)
except Exception:
pass
return {"ip": ip, "open_ports": open_ports, "banners": banners}
def guess_os(ports, banners):
p = set(ports)
if 8123 in p:
b = banners.get(8123, "")
return "Home Assistant" if "Home Assistant" in b else "Home Assistant/Linux"
if 445 in p and 3389 in p: return "Windows (SMB+RDP)"
if 445 in p and 135 in p: return "Windows (SMB+RPC)"
if 3389 in p: return "Windows (RDP)"
if 22 in p and 80 in p and 443 in p: return "Linux (SSH+Web)"
if 22 in p and 5432 in p: return "Linux (PostgreSQL)"
if 22 in p and 3306 in p: return "Linux (MySQL)"
if 22 in p: return "Linux (SSH)"
if 53 in p and 80 in p: return "Router/Gateway (DNS+Web)"
if 53 in p: return "Router/Gateway (DNS)"
if 1883 in p: return "MQTT Broker (IoT)"
if 80 in p or 443 in p: return "Webserver"
return "Onbekend"
def resolve_hostname(ip):
try:
return socket.gethostbyaddr(ip)[0]
except Exception:
return ip
def get_mac(ip):
try:
r = subprocess.run(["arp", "-a", ip], capture_output=True, text=True, timeout=2)
for line in r.stdout.splitlines():
if ip in line:
for part in line.split():
if "-" in part and len(part) == 17:
return part.replace("-", ":").upper()
except Exception:
pass
return ""
# ── main ─────────────────────────────────────────────────────────────────────
def main():
net = ipaddress.ip_network(NETWORK, strict=False)
all_hosts = [str(h) for h in net.hosts()]
total = len(all_hosts)
print(f"Netwerk scan: {NETWORK}")
print(f"Hosts: {total} | Poorten: {len(TOP_PORTS)} | Threads: {MAX_THREADS}")
print("=" * 70)
t0 = time.time()
# ── Fase 1: Host discovery via ICMP ping ──────────────────────────────
print("\n[Fase 1] ICMP host discovery...")
alive = []
with ThreadPoolExecutor(max_workers=MAX_THREADS) as ex:
futs = [ex.submit(ping_host, ip) for ip in all_hosts]
for i, fut in enumerate(as_completed(futs), 1):
ip, ok = fut.result()
if ok:
alive.append(ip)
if i % 20 == 0 or i == total:
print(f"\r {i}/{total} getest, {len(alive)} actief", end="", flush=True)
print(f"\n Klaar: {len(alive)} actieve host(s) in {time.time()-t0:.0f}s")
if not alive:
print("\nGeen actieve hosts gevonden.")
return
# ── Fase 2: TCP-poortscan op actieve hosts ────────────────────────────
print(f"\n[Fase 2] TCP poortscan op {len(alive)} host(s)...")
results = []
with ThreadPoolExecutor(max_workers=min(MAX_THREADS, len(alive))) as ex:
futs = {ex.submit(tcp_scan, ip, TOP_PORTS): ip for ip in alive}
for i, fut in enumerate(as_completed(futs), 1):
ip = futs[fut]
try:
r = fut.result()
r["hostname"] = resolve_hostname(ip)
r["mac"] = get_mac(ip)
r["os_guess"] = guess_os(r["open_ports"], r["banners"])
results.append(r)
except Exception as e:
print(f"\n Fout bij {ip}: {e}")
if i % 5 == 0 or i == len(alive):
print(f"\r {i}/{len(alive)} hosts gescand", end="", flush=True)
elapsed = time.time() - t0
print(f"\n Klaar in {elapsed:.0f}s\n")
# ── Output ────────────────────────────────────────────────────────────
results.sort(key=lambda r: ipaddress.IPv4Address(r["ip"]))
for h in results:
ip = h["ip"]
ports = h["open_ports"]
banners = h["banners"]
pnames = {}
for p in ports:
try:
pnames[p] = socket.getservbyport(p, "tcp")
except Exception:
pnames[p] = "?"
print("-" * 70)
print(f" IP : {ip}")
if h["hostname"] != ip:
print(f" Hostname : {h['hostname']}")
if h["mac"]:
print(f" MAC : {h['mac']}")
print(f" OS (guess) : {h['os_guess']}")
print(f" Open poorten ({len(ports)}):")
if ports:
for p in ports:
svc = pnames.get(p, "?")
b = banners.get(p, "")
line = f" {p:>5}/tcp {svc:<14}"
if b:
line += f" [{b}]"
print(line)
else:
print(" (alle geteste poorten zijn gesloten/gefilterd)")
print()
# ── Samenvatting ──────────────────────────────────────────────────────
print("=" * 70)
print(f"{'IP':<16} {'MAC':<18} {'Hostname':<30} {'OS':<26} {'Poorten':>7}")
print("-" * 100)
for h in results:
print(f"{h['ip']:<16} {h['mac']:<18} {h['hostname'][:29]:<30} {h['os_guess'][:25]:<26} {len(h['open_ports']):>7}")
print()
if __name__ == "__main__":
main()