nerdymark's Software Engineering & Cybersecurity Blog

Welcome to my digital homestead - a curated collection of projects, writeups, and experiments in Python, cybersecurity, and creative coding.

Here you'll find CTF writeups covering Azure OAuth privilege escalation, AWS S3 multi-service exploitation, Kubernetes SSRF attack chains, Terraform state poisoning, Go malware reverse engineering, and supply-chain compromises on GitHub Actions runners. You'll also find side projects like the Pokemon Sleep Roster Analyzer, an RDP/VNC network scanner, a LinkedIn feed analyzer powered by Gemini, and word-puzzle solvers for Wordle and Hardle. Plus notes on building this Flask site, migrating to AWS, Bluesky cross-posting, and running a personal AI robot out of my garage.

I'm Mark LaCore - Software Engineer by day, Raspberry Pi tinkerer by night. 25 years of turning caffeine into code, 20+ years of playing guitar, and a growing collection of CTF trophies. Explore the posts below, browse the CTF writeups, or drop me a line.

Merge pull request #6 from nerdymark/feature/ip-camera-rtsp-enumeration Detect,...

File: backend/database.py
 def init_db(db_path: str) -> None:
         ("vnc_desktop_name", "TEXT", "''"),
         ("vnc_screenshot_path", "TEXT", "''"),
         ("web_screenshots", "TEXT", "'[]'"),
+        ("is_camera", "INTEGER", "0"),
+        ("camera_product", "TEXT", "''"),
+        ("rtsp_port", "INTEGER", "NULL"),
+        ("rtsp_url", "TEXT", "''"),
+        ("rtsp_auth_required", "INTEGER", "NULL"),
+        ("camera_screenshot_path", "TEXT", "''"),
     ]
     for col, col_type, default in migrations:
         try:
 def upsert_host(db_path: str, scan_id: int, subnet_id: int, ip: str, **kwargs) -
                         "asn", "isp", "org", "country", "country_code", "city",
                         "latitude", "longitude", "ip_type", "reverse_dns",
                         "vnc_open", "vnc_auth_required", "vnc_desktop_name", "vnc_screenshot_path",
-                        "web_screenshots"):
+                        "web_screenshots",
+                        "is_camera", "camera_product", "rtsp_port", "rtsp_url",
+                        "rtsp_auth_required", "camera_screenshot_path"):
                 if key in kwargs and kwargs[key] is not None:
                     val = kwargs[key]
                     if key in ("all_ports", "security_protocols", "web_screenshots") and isinstance(val, list):
 def upsert_host(db_path: str, scan_id: int, subnet_id: int, ip: str, **kwargs) -
                     asn, isp, org, country, country_code, city,
                     latitude, longitude, ip_type, reverse_dns,
                     vnc_open, vnc_auth_required, vnc_desktop_name, vnc_screenshot_path,
-                    web_screenshots)
+                    web_screenshots,
+                    is_camera, camera_product, rtsp_port, rtsp_url,
+                    rtsp_auth_required, camera_screenshot_path)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, ?,
                            ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
-                           ?, ?, ?, ?, ?)""",
+                           ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
                 (
                     scan_id, subnet_id, ip,
                     kwargs.get("hostname", ""),
 def upsert_host(db_path: str, scan_id: int, subnet_id: int, ip: str, **kwargs) -
                     kwargs.get("vnc_desktop_name", ""),
                     kwargs.get("vnc_screenshot_path", ""),
                     web_screenshots,
+                    kwargs.get("is_camera", 0),
+                    kwargs.get("camera_product", ""),
+                    kwargs.get("rtsp_port"),
+                    kwargs.get("rtsp_url", ""),
+                    kwargs.get("rtsp_auth_required"),
+                    kwargs.get("camera_screenshot_path", ""),
                 ),
             )
             conn.commit()
 def get_host_stats(db_path: str) -> dict:
         total = conn.execute("SELECT COUNT(*) FROM hosts").fetchone()[0]
         rdp_open = conn.execute("SELECT COUNT(*) FROM hosts WHERE rdp_open = 1").fetchone()[0]
         vnc_open = conn.execute("SELECT COUNT(*) FROM hosts WHERE vnc_open = 1").fetchone()[0]
-        # A "jackpot" is a host we actually got a visible login/desktop from —
-        # it has an RDP or VNC screenshot saved.
+        # A "jackpot" is a host we actually got a visible login screen, desktop,
+        # or camera frame from — any of the three screenshot paths populated.
         jackpots = conn.execute(
             "SELECT COUNT(*) FROM hosts "
             "WHERE COALESCE(screenshot_path, '') != '' "
-            "   OR COALESCE(vnc_screenshot_path, '') != ''"
+            "   OR COALESCE(vnc_screenshot_path, '') != '' "
+            "   OR COALESCE(camera_screenshot_path, '') != ''"
         ).fetchone()[0]
         subnets_scanned = conn.execute("SELECT COUNT(DISTINCT subnet_id) FROM hosts").fetchone()[0]
         total_scans = conn.execute("SELECT COUNT(*) FROM scans").fetchone()[0]
 def list_jackpots(db_path: str) -> list[dict]:
             "SELECT * FROM hosts "
             "WHERE COALESCE(screenshot_path, '') != '' "
             "   OR COALESCE(vnc_screenshot_path, '') != '' "
+            "   OR COALESCE(camera_screenshot_path, '') != '' "
             "ORDER BY last_seen_at DESC"
         ).fetchall()
         return [_parse_host_json(dict(r)) for r in rows]
File: backend/models.py
 class HostResponse(BaseModel):
     vnc_desktop_name: str = ""
     vnc_screenshot_path: str = ""
     web_screenshots: list = []
+    is_camera: int = 0
+    camera_product: str = ""
+    rtsp_port: Optional[int] = None
+    rtsp_url: str = ""
+    rtsp_auth_required: Optional[int] = None
+    camera_screenshot_path: str = ""
 class HostStats(BaseModel):
File: backend/routers/scans.py
 def _scan_timed_out() -> bool:
                 if web_screenshots:
                     host_info["web_screenshots"] = web_screenshots
+        # Phase 4.6: Camera / RTSP enumeration — detect, probe paths, grab frame.
+        # Runs on every discovered host so we catch cameras even on hosts that
+        # don't advertise RDP or VNC (many NVRs are RTSP-only).
+        for host_info in discovered:
+            if _scan_timed_out():
+                break
+            # RTSP-only hosts won't have been full-scanned yet — do it now so
+            # detect_camera has real port/product data to match against.
+            if (host_info.get("rtsp_open")
+                    and host_info["ip"] not in full_scanned_ips):
+                try:
+                    full_data = scanner.full_scan(host_info["ip"])
+                    host_info.update(full_data)
+                    full_scanned_ips.add(host_info["ip"])
+                except Exception as e:
+                    logger.error(f"Full scan for RTSP host {host_info['ip']}: {e}")
+            all_ports = host_info.get("all_ports", [])
+            cam = scanner.detect_camera(all_ports)
+            if not cam["is_camera"]:
+                continue
+            host_info["is_camera"] = 1
+            host_info["camera_product"] = cam["camera_product"]
+            # Try RTSP ports in order, first working stream wins
+            rtsp_ports = cam["rtsp_ports"] or [554]
+            for rtsp_port in rtsp_ports:
+                try:
+                    probe = scanner.probe_rtsp(host_info["ip"], port=rtsp_port)
+                except Exception as e:
+                    logger.error(f"RTSP probe failed for {host_info['ip']}:{rtsp_port}: {e}")
+                    continue
+                if not probe:
+                    continue
+                host_info["rtsp_port"] = rtsp_port
+                host_info["rtsp_url"] = probe["rtsp_url"]
+                host_info["rtsp_auth_required"] = probe["rtsp_auth_required"]
+                # Only grab a frame if the stream is open (no auth required)
+                if probe["rtsp_auth_required"] == 0:
+                    try:
+                        frame = scanner.capture_rtsp_frame(
+                            host_info["ip"], probe["rtsp_url"], "screenshots"
+                        )
+                        if frame:
+                            host_info["camera_screenshot_path"] = frame
+                    except Exception as e:
+                        logger.error(f"RTSP frame grab failed for {host_info['ip']}: {e}")
+                break  # stop at first RTSP port that gave us anything
+
         # Phase 5: Host enrichment (ASN, GeoIP, reverse DNS, IP type)
         for host_info in discovered:
             if _scan_timed_out():
 def _scan_timed_out() -> bool:
             except Exception as e:
                 logger.error(f"Enrichment failed for {host_info['ip']}: {e}")
-        # Upsert only hosts with a confirmed open RDP or VNC port. A host whose
-        # discovery hit couldn't be verified (timeout, tarpit, firewall that
-        # SYN-ACKs everything) is dropped rather than saved as a ghost HIT.
-        interesting = [h for h in discovered if h.get("rdp_open") or h.get("vnc_open")]
+        # Upsert only hosts with a confirmed open RDP, VNC, or camera. A host
+        # whose discovery hit couldn't be verified (timeout, tarpit, firewall
+        # that SYN-ACKs everything) is dropped rather than saved as a ghost HIT.
+        interesting = [
+            h for h in discovered
+            if h.get("rdp_open") or h.get("vnc_open") or h.get("is_camera")
+        ]
         dropped = len(discovered) - len(interesting)
         if dropped:
             logger.info(f"Scan {scan_id}: dropping {dropped} unverified hosts")
 def _scan_timed_out() -> bool:
                 vnc_desktop_name=host_info.get("vnc_desktop_name", ""),
                 vnc_screenshot_path=host_info.get("vnc_screenshot_path", ""),
                 web_screenshots=host_info.get("web_screenshots", []),
+                is_camera=host_info.get("is_camera", 0),
+                camera_product=host_info.get("camera_product", ""),
+                rtsp_port=host_info.get("rtsp_port"),
+                rtsp_url=host_info.get("rtsp_url", ""),
+                rtsp_auth_required=host_info.get("rtsp_auth_required"),
+                camera_screenshot_path=host_info.get("camera_screenshot_path", ""),
             )
             # Announce new RDP/VNC hosts via Bluesky
Read more...