""" Netwerkscanner v3 — robuust op Windows. Fase 1: ICMP host-discovery in kleine batches. Fase 2: TCP poortscan op actieve hosts. """ import socket import struct import sys import time import ipaddress import subprocess from concurrent.futures import ThreadPoolExecutor, as_completed NETWORK = "192.168.1.0/24" TIMEOUT = 0.5 MAX_THREADS = 25 # lager voor Windows-stabiliteit TOP_PORTS = [ 21, 22, 23, 25, 53, 80, 81, 88, 110, 111, 135, 139, 143, 161, 389, 443, 445, 465, 500, 514, 515, 548, 554, 587, 631, 636, 873, 993, 995, 1433, 1521, 1723, 1880, 1883, 2049, 2082, 2083, 2222, 3128, 3306, 3389, 5000, 5432, 5900, 6379, 7001, 8000, 8008, 8080, 8081, 8123, 8443, 8888, 9000, 9090, 9200, 9443, 10000, 27017, ] PROBES = { 22: b'', 25: b'', 80: b'GET / HTTP/1.0\r\nHost: scan\r\n\r\n', 443: b'GET / HTTP/1.0\r\nHost: scan\r\n\r\n', 3306: b'', 5432: b'', 6379: b'PING\r\n', 8080: b'GET / HTTP/1.0\r\nHost: scan\r\n\r\n', 8123: b'GET /api/ HTTP/1.0\r\nHost: scan\r\n\r\n', } def ping_host(ip): """ICMP ping, retourneert (ip, True/False).""" try: cmd = ["ping", "-n", "1", "-w", "1500", ip] r = subprocess.run(cmd, capture_output=True, timeout=3) return (ip, r.returncode == 0) except Exception: return (ip, False) def tcp_scan(ip, ports): """Scan TCP-poorten op één host. Retourneert dict met resultaten.""" open_ports = [] banners = {} for port in ports: try: s = socket.create_connection((ip, port), timeout=TIMEOUT) # Banner grab probe = PROBES.get(port) if probe: try: s.sendall(probe) s.settimeout(0.5) b = s.recv(512) text = b.decode("latin-1", errors="replace").split("\r\n")[0].strip() banners[port] = text[:100] except Exception: pass s.close() open_ports.append(port) except Exception: pass return {"ip": ip, "open_ports": open_ports, "banners": banners} def guess_os(ports, banners): p = set(ports) if 8123 in p: b = banners.get(8123, "") return "Home Assistant" if "Home Assistant" in b else "Home Assistant/Linux" if 445 in p and 3389 in p: return "Windows (SMB+RDP)" if 445 in p and 135 in p: return "Windows (SMB+RPC)" if 3389 in p: return "Windows (RDP)" if 22 in p and 80 in p and 443 in p: return "Linux (SSH+Web)" if 22 in p and 5432 in p: return "Linux (PostgreSQL)" if 22 in p and 3306 in p: return "Linux (MySQL)" if 22 in p: return "Linux (SSH)" if 53 in p and 80 in p: return "Router/Gateway (DNS+Web)" if 53 in p: return "Router/Gateway (DNS)" if 1883 in p: return "MQTT Broker (IoT)" if 80 in p or 443 in p: return "Webserver" return "Onbekend" def resolve_hostname(ip): try: return socket.gethostbyaddr(ip)[0] except Exception: return ip def get_mac(ip): try: r = subprocess.run(["arp", "-a", ip], capture_output=True, text=True, timeout=2) for line in r.stdout.splitlines(): if ip in line: for part in line.split(): if "-" in part and len(part) == 17: return part.replace("-", ":").upper() except Exception: pass return "" # ── main ───────────────────────────────────────────────────────────────────── def main(): net = ipaddress.ip_network(NETWORK, strict=False) all_hosts = [str(h) for h in net.hosts()] total = len(all_hosts) print(f"Netwerk scan: {NETWORK}") print(f"Hosts: {total} | Poorten: {len(TOP_PORTS)} | Threads: {MAX_THREADS}") print("=" * 70) t0 = time.time() # ── Fase 1: Host discovery via ICMP ping ────────────────────────────── print("\n[Fase 1] ICMP host discovery...") alive = [] with ThreadPoolExecutor(max_workers=MAX_THREADS) as ex: futs = [ex.submit(ping_host, ip) for ip in all_hosts] for i, fut in enumerate(as_completed(futs), 1): ip, ok = fut.result() if ok: alive.append(ip) if i % 20 == 0 or i == total: print(f"\r {i}/{total} getest, {len(alive)} actief", end="", flush=True) print(f"\n Klaar: {len(alive)} actieve host(s) in {time.time()-t0:.0f}s") if not alive: print("\nGeen actieve hosts gevonden.") return # ── Fase 2: TCP-poortscan op actieve hosts ──────────────────────────── print(f"\n[Fase 2] TCP poortscan op {len(alive)} host(s)...") results = [] with ThreadPoolExecutor(max_workers=min(MAX_THREADS, len(alive))) as ex: futs = {ex.submit(tcp_scan, ip, TOP_PORTS): ip for ip in alive} for i, fut in enumerate(as_completed(futs), 1): ip = futs[fut] try: r = fut.result() r["hostname"] = resolve_hostname(ip) r["mac"] = get_mac(ip) r["os_guess"] = guess_os(r["open_ports"], r["banners"]) results.append(r) except Exception as e: print(f"\n Fout bij {ip}: {e}") if i % 5 == 0 or i == len(alive): print(f"\r {i}/{len(alive)} hosts gescand", end="", flush=True) elapsed = time.time() - t0 print(f"\n Klaar in {elapsed:.0f}s\n") # ── Output ──────────────────────────────────────────────────────────── results.sort(key=lambda r: ipaddress.IPv4Address(r["ip"])) for h in results: ip = h["ip"] ports = h["open_ports"] banners = h["banners"] pnames = {} for p in ports: try: pnames[p] = socket.getservbyport(p, "tcp") except Exception: pnames[p] = "?" print("-" * 70) print(f" IP : {ip}") if h["hostname"] != ip: print(f" Hostname : {h['hostname']}") if h["mac"]: print(f" MAC : {h['mac']}") print(f" OS (guess) : {h['os_guess']}") print(f" Open poorten ({len(ports)}):") if ports: for p in ports: svc = pnames.get(p, "?") b = banners.get(p, "") line = f" {p:>5}/tcp {svc:<14}" if b: line += f" [{b}]" print(line) else: print(" (alle geteste poorten zijn gesloten/gefilterd)") print() # ── Samenvatting ────────────────────────────────────────────────────── print("=" * 70) print(f"{'IP':<16} {'MAC':<18} {'Hostname':<30} {'OS':<26} {'Poorten':>7}") print("-" * 100) for h in results: print(f"{h['ip']:<16} {h['mac']:<18} {h['hostname'][:29]:<30} {h['os_guess'][:25]:<26} {len(h['open_ports']):>7}") print() if __name__ == "__main__": main()