File: backend/database.py
def init_db(db_path: str) -> None:
("vnc_desktop_name", "TEXT", "''"),
("vnc_screenshot_path", "TEXT", "''"),
("web_screenshots", "TEXT", "'[]'"),
+ ("is_camera", "INTEGER", "0"),
+ ("camera_product", "TEXT", "''"),
+ ("rtsp_port", "INTEGER", "NULL"),
+ ("rtsp_url", "TEXT", "''"),
+ ("rtsp_auth_required", "INTEGER", "NULL"),
+ ("camera_screenshot_path", "TEXT", "''"),
]
for col, col_type, default in migrations:
try:
def upsert_host(db_path: str, scan_id: int, subnet_id: int, ip: str, **kwargs) -
"asn", "isp", "org", "country", "country_code", "city",
"latitude", "longitude", "ip_type", "reverse_dns",
"vnc_open", "vnc_auth_required", "vnc_desktop_name", "vnc_screenshot_path",
- "web_screenshots"):
+ "web_screenshots",
+ "is_camera", "camera_product", "rtsp_port", "rtsp_url",
+ "rtsp_auth_required", "camera_screenshot_path"):
if key in kwargs and kwargs[key] is not None:
val = kwargs[key]
if key in ("all_ports", "security_protocols", "web_screenshots") and isinstance(val, list):
def upsert_host(db_path: str, scan_id: int, subnet_id: int, ip: str, **kwargs) -
asn, isp, org, country, country_code, city,
latitude, longitude, ip_type, reverse_dns,
vnc_open, vnc_auth_required, vnc_desktop_name, vnc_screenshot_path,
- web_screenshots)
+ web_screenshots,
+ is_camera, camera_product, rtsp_port, rtsp_url,
+ rtsp_auth_required, camera_screenshot_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
- ?, ?, ?, ?, ?)""",
+ ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
scan_id, subnet_id, ip,
kwargs.get("hostname", ""),
def upsert_host(db_path: str, scan_id: int, subnet_id: int, ip: str, **kwargs) -
kwargs.get("vnc_desktop_name", ""),
kwargs.get("vnc_screenshot_path", ""),
web_screenshots,
+ kwargs.get("is_camera", 0),
+ kwargs.get("camera_product", ""),
+ kwargs.get("rtsp_port"),
+ kwargs.get("rtsp_url", ""),
+ kwargs.get("rtsp_auth_required"),
+ kwargs.get("camera_screenshot_path", ""),
),
)
conn.commit()
def get_host_stats(db_path: str) -> dict:
total = conn.execute("SELECT COUNT(*) FROM hosts").fetchone()[0]
rdp_open = conn.execute("SELECT COUNT(*) FROM hosts WHERE rdp_open = 1").fetchone()[0]
vnc_open = conn.execute("SELECT COUNT(*) FROM hosts WHERE vnc_open = 1").fetchone()[0]
- # A "jackpot" is a host we actually got a visible login/desktop from —
- # it has an RDP or VNC screenshot saved.
+ # A "jackpot" is a host we actually got a visible login screen, desktop,
+ # or camera frame from — any of the three screenshot paths populated.
jackpots = conn.execute(
"SELECT COUNT(*) FROM hosts "
"WHERE COALESCE(screenshot_path, '') != '' "
- " OR COALESCE(vnc_screenshot_path, '') != ''"
+ " OR COALESCE(vnc_screenshot_path, '') != '' "
+ " OR COALESCE(camera_screenshot_path, '') != ''"
).fetchone()[0]
subnets_scanned = conn.execute("SELECT COUNT(DISTINCT subnet_id) FROM hosts").fetchone()[0]
total_scans = conn.execute("SELECT COUNT(*) FROM scans").fetchone()[0]
def list_jackpots(db_path: str) -> list[dict]:
"SELECT * FROM hosts "
"WHERE COALESCE(screenshot_path, '') != '' "
" OR COALESCE(vnc_screenshot_path, '') != '' "
+ " OR COALESCE(camera_screenshot_path, '') != '' "
"ORDER BY last_seen_at DESC"
).fetchall()
return [_parse_host_json(dict(r)) for r in rows]
File: backend/models.py
class HostResponse(BaseModel):
vnc_desktop_name: str = ""
vnc_screenshot_path: str = ""
web_screenshots: list = []
+ is_camera: int = 0
+ camera_product: str = ""
+ rtsp_port: Optional[int] = None
+ rtsp_url: str = ""
+ rtsp_auth_required: Optional[int] = None
+ camera_screenshot_path: str = ""
class HostStats(BaseModel):
File: backend/routers/scans.py
def _scan_timed_out() -> bool:
if web_screenshots:
host_info["web_screenshots"] = web_screenshots
+ # Phase 4.6: Camera / RTSP enumeration — detect, probe paths, grab frame.
+ # Runs on every discovered host so we catch cameras even on hosts that
+ # don't advertise RDP or VNC (many NVRs are RTSP-only).
+ for host_info in discovered:
+ if _scan_timed_out():
+ break
+ # RTSP-only hosts won't have been full-scanned yet — do it now so
+ # detect_camera has real port/product data to match against.
+ if (host_info.get("rtsp_open")
+ and host_info["ip"] not in full_scanned_ips):
+ try:
+ full_data = scanner.full_scan(host_info["ip"])
+ host_info.update(full_data)
+ full_scanned_ips.add(host_info["ip"])
+ except Exception as e:
+ logger.error(f"Full scan for RTSP host {host_info['ip']}: {e}")
+ all_ports = host_info.get("all_ports", [])
+ cam = scanner.detect_camera(all_ports)
+ if not cam["is_camera"]:
+ continue
+ host_info["is_camera"] = 1
+ host_info["camera_product"] = cam["camera_product"]
+ # Try RTSP ports in order, first working stream wins
+ rtsp_ports = cam["rtsp_ports"] or [554]
+ for rtsp_port in rtsp_ports:
+ try:
+ probe = scanner.probe_rtsp(host_info["ip"], port=rtsp_port)
+ except Exception as e:
+ logger.error(f"RTSP probe failed for {host_info['ip']}:{rtsp_port}: {e}")
+ continue
+ if not probe:
+ continue
+ host_info["rtsp_port"] = rtsp_port
+ host_info["rtsp_url"] = probe["rtsp_url"]
+ host_info["rtsp_auth_required"] = probe["rtsp_auth_required"]
+ # Only grab a frame if the stream is open (no auth required)
+ if probe["rtsp_auth_required"] == 0:
+ try:
+ frame = scanner.capture_rtsp_frame(
+ host_info["ip"], probe["rtsp_url"], "screenshots"
+ )
+ if frame:
+ host_info["camera_screenshot_path"] = frame
+ except Exception as e:
+ logger.error(f"RTSP frame grab failed for {host_info['ip']}: {e}")
+ break # stop at first RTSP port that gave us anything
+
# Phase 5: Host enrichment (ASN, GeoIP, reverse DNS, IP type)
for host_info in discovered:
if _scan_timed_out():
def _scan_timed_out() -> bool:
except Exception as e:
logger.error(f"Enrichment failed for {host_info['ip']}: {e}")
- # Upsert only hosts with a confirmed open RDP or VNC port. A host whose
- # discovery hit couldn't be verified (timeout, tarpit, firewall that
- # SYN-ACKs everything) is dropped rather than saved as a ghost HIT.
- interesting = [h for h in discovered if h.get("rdp_open") or h.get("vnc_open")]
+ # Upsert only hosts with a confirmed open RDP, VNC, or camera. A host
+ # whose discovery hit couldn't be verified (timeout, tarpit, firewall
+ # that SYN-ACKs everything) is dropped rather than saved as a ghost HIT.
+ interesting = [
+ h for h in discovered
+ if h.get("rdp_open") or h.get("vnc_open") or h.get("is_camera")
+ ]
dropped = len(discovered) - len(interesting)
if dropped:
logger.info(f"Scan {scan_id}: dropping {dropped} unverified hosts")
def _scan_timed_out() -> bool:
vnc_desktop_name=host_info.get("vnc_desktop_name", ""),
vnc_screenshot_path=host_info.get("vnc_screenshot_path", ""),
web_screenshots=host_info.get("web_screenshots", []),
+ is_camera=host_info.get("is_camera", 0),
+ camera_product=host_info.get("camera_product", ""),
+ rtsp_port=host_info.get("rtsp_port"),
+ rtsp_url=host_info.get("rtsp_url", ""),
+ rtsp_auth_required=host_info.get("rtsp_auth_required"),
+ camera_screenshot_path=host_info.get("camera_screenshot_path", ""),
)
# Announce new RDP/VNC hosts via Bluesky