File: backend/camera.py
def _do_capture(
with tempfile.TemporaryDirectory() as tmpdir:
try:
logger.debug("capture_image: starting capture into tmpdir=%s", tmpdir)
- # Kill gvfs camera daemons proactively before the first capture
- # attempt. On cameras that use PTP/MTP as their only USB mode
- # (e.g. Nikon D780), gvfs can (re)claim the PTP session in the
- # window between is_camera_connected() returning True and the first
- # capture call.
- _kill_gvfs_monitor()
if bulb:
_bulb_capture(tmpdir, bulb_seconds)
def _do_capture(
raise RuntimeError(f"Capture failed: {exc.stderr}") from exc
+def _enable_liveview() -> None:
+ """Enable Live View (viewfinder=1) and wait for the mirror to flip.
+
+ Cameras like the Nikon D780 reject PTP InitiateCapture with "Access
+ Denied" unless the mirror is up / Live View is active. The mirror flip
+ takes about 1-2 seconds, so we issue viewfinder=1 in its own gphoto2
+ invocation and wait before proceeding to the actual capture command.
+ """
+ _kill_gvfs_monitor()
+ try:
+ _run(["--set-config", "viewfinder=1"], check=True)
+ except subprocess.CalledProcessError:
+ logger.debug("_enable_liveview: viewfinder config not supported, skipping")
+ return
+ # Give the camera time to raise the mirror and fully enter Live View.
+ time.sleep(2)
+
+
def _normal_capture(tmpdir: str) -> None:
"""Standard capture-and-download with PTP retry logic."""
+ _enable_liveview()
for attempt in range(_PTP_MAX_ATTEMPTS):
- # Enable Live View (viewfinder=1) before capture. Cameras like the
- # Nikon D780 reject PTP InitiateCapture with "Access Denied" unless the
- # mirror is up / Live View is active. Also set capturetarget in the
- # same invocation so it persists.
+ # Kill gvfs proactively on every attempt – it can reclaim the USB
+ # interface between retries.
+ _kill_gvfs_monitor()
result = _run(
[
- "--set-config", "viewfinder=1",
"--set-config", "capturetarget=0",
"--capture-image-and-download",
"--filename",
def _normal_capture(tmpdir: str) -> None:
attempt + 1,
_PTP_MAX_ATTEMPTS,
)
- _kill_gvfs_monitor()
continue
raise RuntimeError(f"Capture failed: {stderr_stripped}")
if stderr_stripped and "ERROR: Could not capture" in stderr_stripped:
def _bulb_capture(tmpdir: str, bulb_seconds: Optional[int] = None) -> None:
# Enable Live View first – cameras like the Nikon D780 reject PTP capture
# commands unless the mirror is up / Live View is active.
+ _enable_liveview()
+
# Try Nikon-style epress2 first, then generic bulb.
# Set capturetarget in the same call that opens the shutter.
try:
_run(
[
- "--set-config", "viewfinder=1",
"--set-config", "capturetarget=0",
"--set-config", "epress2=on",
],
def _bulb_capture(tmpdir: str, bulb_seconds: Optional[int] = None) -> None:
logger.debug("_bulb_capture: epress2 not supported, trying bulb=1")
_run(
[
- "--set-config", "viewfinder=1",
"--set-config", "capturetarget=0",
"--set-config", "bulb=1",
],
File: backend/stacking.py
- mean : arithmetic mean of all frames (good for light pollution reduction)
- median : per-pixel median (best noise rejection)
- sum : simple accumulation (highlights faint stars)
+
+Memory-efficient: mean and sum use running accumulators (O(1) frame memory).
+Median processes images in horizontal strips to limit peak memory usage.
"""
import logging
StackMode = Literal["mean", "median", "sum"]
+# Number of pixel rows to process at a time for median stacking.
+# Smaller = less memory, larger = fewer I/O passes.
+_MEDIAN_STRIP_HEIGHT = 512
+
def stack_images(
image_paths: list[Path],
def stack_images(
logger.info("Stacking %d images with mode=%r", len(image_paths), mode)
- frames: list[np.ndarray] = []
- reference_size: tuple[int, int] | None = None
+ if mode in ("mean", "sum"):
+ return _stack_accumulate(image_paths, mode, np)
+ else:
+ return _stack_median(image_paths, np)
+
+
+def _open_and_prepare(
+ path: Path, reference_size: tuple[int, int] | None
+) -> Image.Image:
+ """Open an image, convert to RGB, and resize if needed."""
+ img = Image.open(path).convert("RGB")
+ if reference_size is not None and img.size != reference_size:
+ logger.warning(
+ "Image %s has size %s; expected %s – resizing",
+ path.name,
+ img.size,
+ reference_size,
+ )
+ img = img.resize(reference_size, Image.LANCZOS)
+ return img
+
+
+def _stack_accumulate(image_paths, mode, np):
+ """Mean/sum stacking using a running accumulator (O(1) frame memory)."""
+ accumulator = None
+ reference_size = None
+ n = len(image_paths)
for path in image_paths:
- img = Image.open(path).convert("RGB")
+ img = _open_and_prepare(path, reference_size)
if reference_size is None:
reference_size = img.size
- elif img.size != reference_size:
- # Resize to match the first frame rather than failing hard
- logger.warning(
- "Image %s has size %s; expected %s – resizing",
- path.name,
- img.size,
- reference_size,
- )
- img = img.resize(reference_size, Image.LANCZOS)
- frames.append(np.array(img, dtype=np.float32))
-
- stack = np.stack(frames, axis=0) # shape: (N, H, W, 3)
+ arr = np.array(img, dtype=np.float32)
+ if accumulator is None:
+ accumulator = arr
+ else:
+ accumulator += arr
if mode == "mean":
- result = np.mean(stack, axis=0)
- elif mode == "median":
- result = np.median(stack, axis=0)
- elif mode == "sum":
- result = np.sum(stack, axis=0)
- # Normalise so the brightest pixel maps to 255
- max_val = result.max()
+ result = accumulator / n
+ else: # sum
+ max_val = accumulator.max()
if max_val > 0:
- result = result / max_val * 255.0
+ result = accumulator / max_val * 255.0
+ else:
+ result = accumulator
result = np.clip(result, 0, 255).astype(np.uint8)
return Image.fromarray(result, mode="RGB")
+
+
+def _stack_median(image_paths, np):
+ """Median stacking using horizontal strips to limit peak memory.
+
+ Instead of loading all N images into one (N, H, W, 3) array, we process
+ _MEDIAN_STRIP_HEIGHT rows at a time, keeping only one strip per image in
+ memory. Peak memory ≈ N * strip_height * W * 3 * 4 bytes.
+ """
+ # Determine reference size from first image.
+ first = Image.open(image_paths[0]).convert("RGB")
+ reference_size = first.size
+ width, height = reference_size
+ first.close()
+
+ result_rows = []
+
+ for y_start in range(0, height, _MEDIAN_STRIP_HEIGHT):
+ y_end = min(y_start + _MEDIAN_STRIP_HEIGHT, height)
+ strip_h = y_end - y_start
+
+ # Collect this strip from every image.
+ strips = np.empty(
+ (len(image_paths), strip_h, width, 3), dtype=np.float32
+ )
+ for i, path in enumerate(image_paths):
+ img = _open_and_prepare(path, reference_size)
+ # Crop to the strip: (left, upper, right, lower)
+ strip_img = img.crop((0, y_start, width, y_end))
+ strips[i] = np.array(strip_img, dtype=np.float32)
+
+ median_strip = np.median(strips, axis=0)
+ result_rows.append(np.clip(median_strip, 0, 255).astype(np.uint8))
+
+ result = np.concatenate(result_rows, axis=0)
+ return Image.fromarray(result, mode="RGB")
File: backend/tests/test_backend.py
def mock_run(args, check=True, cwd=None):
assert result.exists(), "capture_image must return a valid file path on retry"
assert capture_call_count == 2, "Expected exactly two capture attempts"
- # 1 proactive kill before first attempt + 1 kill between the failed and
- # successful attempt = 2 total.
- assert mock_kill.call_count == 2, (
- "gvfs must be killed once proactively before the first attempt and "
- "once between the failed and successful attempt (2 total); "
- f"got {mock_kill.call_count}"
+ # 1 kill in _enable_liveview + 2 kills in the loop (one per attempt) = 3.
+ assert mock_kill.call_count == 3, (
+ "gvfs must be killed once in _enable_liveview and once per capture "
+ f"attempt (1 + 2 = 3 total); got {mock_kill.call_count}"
)
def test_capture_image_ptp_access_denied_all_retries_kill_gvfs(self, tmp_path):
"""Each PTP access denied attempt must kill gvfs before the next retry.
When all _PTP_MAX_ATTEMPTS fail, gvfs must have been killed
- _PTP_MAX_ATTEMPTS times: once proactively before the first attempt and
- once between each pair of subsequent attempts
- (_PTP_MAX_ATTEMPTS - 1 retry gaps).
+ 1 (from _enable_liveview) + _PTP_MAX_ATTEMPTS (one per loop iteration)
+ times total.
"""
import camera as cam
def mock_run(args, check=True, cwd=None):
with pytest.raises(RuntimeError, match="PTP Access Denied"):
cam.capture_image(tmp_path)
- # 1 proactive + (_PTP_MAX_ATTEMPTS - 1) retry gaps = _PTP_MAX_ATTEMPTS total.
- assert mock_kill.call_count == cam._PTP_MAX_ATTEMPTS, (
- f"Expected gvfs to be killed {cam._PTP_MAX_ATTEMPTS} time(s) "
- f"(1 proactive + {cam._PTP_MAX_ATTEMPTS - 1} retry gap(s)); "
+ # 1 from _enable_liveview + _PTP_MAX_ATTEMPTS from the loop.
+ expected = 1 + cam._PTP_MAX_ATTEMPTS
+ assert mock_kill.call_count == expected, (
+ f"Expected gvfs to be killed {expected} time(s) "
+ f"(1 liveview + {cam._PTP_MAX_ATTEMPTS} loop); "
f"got {mock_kill.call_count}"
)
def mock_run(args, check=True, cwd=None):
assert result.exists(), "capture_image must return a valid file path on retry"
assert capture_call_count == 2, "Expected exactly two capture attempts"
- # 1 proactive kill before first attempt + 1 kill between the failed and
- # successful attempt = 2 total.
- assert mock_kill.call_count == 2, (
- "gvfs must be killed once proactively before the first attempt and "
- "once between the failed and successful attempt (2 total); "
- f"got {mock_kill.call_count}"
+ # 1 kill in _enable_liveview + 2 kills in the loop (one per attempt) = 3.
+ assert mock_kill.call_count == 3, (
+ "gvfs must be killed once in _enable_liveview and once per capture "
+ f"attempt (1 + 2 = 3 total); got {mock_kill.call_count}"
)
def test_capture_image_ptp_session_already_opened_all_retries_kill_gvfs(self, tmp_path):
"""Each PTP Session Already Opened attempt must kill gvfs before the next retry.
When all _PTP_MAX_ATTEMPTS fail with this error, gvfs must have been
- killed _PTP_MAX_ATTEMPTS times: once proactively before the first
- attempt and once per retry gap (_PTP_MAX_ATTEMPTS - 1).
+ killed 1 (from _enable_liveview) + _PTP_MAX_ATTEMPTS (one per loop
+ iteration) times total.
"""
import camera as cam
def mock_run(args, check=True, cwd=None):
with pytest.raises(RuntimeError, match="PTP Session Already Opened"):
cam.capture_image(tmp_path)
- # 1 proactive + (_PTP_MAX_ATTEMPTS - 1) retry gaps = _PTP_MAX_ATTEMPTS total.
- assert mock_kill.call_count == cam._PTP_MAX_ATTEMPTS, (
- f"Expected gvfs to be killed {cam._PTP_MAX_ATTEMPTS} time(s) "
- f"(1 proactive + {cam._PTP_MAX_ATTEMPTS - 1} retry gap(s)); "
+ # 1 from _enable_liveview + _PTP_MAX_ATTEMPTS from the loop.
+ expected = 1 + cam._PTP_MAX_ATTEMPTS
+ assert mock_kill.call_count == expected, (
+ f"Expected gvfs to be killed {expected} time(s) "
+ f"(1 liveview + {cam._PTP_MAX_ATTEMPTS} loop); "
f"got {mock_kill.call_count}"
)