Skip to content

Analytics & Scene Detection

Hardware-accelerated functions for analyzing video content and detecting scenes.

Video Scene Detection

Detects scene changes in a video using GPU-accelerated I/O.

This implementation replicates the exact semantics of PySceneDetect's ContentDetector (v0.6.7) to produce identical scene cuts, but leverages VPF (GPUVideoStreamer) for drastically faster frame extraction and resizing. It calculates the mean absolute difference between adjacent frames in HSV space.

Parameters:

Name Type Description Default
video_path Path | str

Path to the input video file.

required
threshold float

The threshold for the frame score to trigger a scene cut. Higher values require more visual change to trigger a cut.

27.0

Returns:

Type Description
List[Tuple[_SecondsTime, _SecondsTime]]

A list of tuples, where each tuple represents a scene containing

List[Tuple[_SecondsTime, _SecondsTime]]

the start time and end time as _SecondsTime objects.

Source code in src/shorts_maker/utils/scenes.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def detect_video_scenes_gpu(
    video_path: Path | str, threshold: float = 27.0
) -> List[Tuple[_SecondsTime, _SecondsTime]]:
    """Detects scene changes in a video using GPU-accelerated I/O.

    This implementation replicates the exact semantics of PySceneDetect's 
    ContentDetector (v0.6.7) to produce identical scene cuts, but leverages 
    VPF (GPUVideoStreamer) for drastically faster frame extraction and resizing.
    It calculates the mean absolute difference between adjacent frames in HSV space.

    Args:
        video_path: Path to the input video file.
        threshold: The threshold for the frame score to trigger a scene cut.
            Higher values require more visual change to trigger a cut.

    Returns:
        A list of tuples, where each tuple represents a scene containing 
        the start time and end time as `_SecondsTime` objects.
    """
    # 1) Determine original size, compute SceneDetect-like downscale factor.
    dmx = nvc.PyFFmpegDemuxer(str(video_path))
    w0 = dmx.Width()
    h0 = dmx.Height()
    fps = dmx.Framerate()
    frame_count = dmx.Numframes()
    del dmx

    # SceneManager.DEFAULT_MIN_WIDTH = 256
    TARGET_MIN_WIDTH = 256
    if w0 < TARGET_MIN_WIDTH:
        downscale = 1.0
    else:
        downscale = w0 / float(TARGET_MIN_WIDTH)

    w_eff = int(w0 / downscale)
    h_eff = int(h0 / downscale)
    w_eff = max(1, w_eff)
    h_eff = max(1, h_eff)

    if frame_count == 0 or fps <= 0.0:
        return []

    # 3) FlashFilter (MERGE) identical logic to scenedetect.scene_detector.FlashFilter
    class _FlashFilterMerge:
        def __init__(self, length: int):
            self._filter_length = int(length)
            self._last_above: Optional[int] = None
            self._merge_enabled: bool = False
            self._merge_triggered: bool = False
            self._merge_start: Optional[int] = None

        @property
        def max_behind(self) -> int:
            return self._filter_length  # pragma: no cover

        def filter(self, frame_num: int, above_threshold: bool) -> List[int]:
            if not (self._filter_length > 0):  # pragma: no cover
                return [frame_num] if above_threshold else []  # pragma: no cover
            if self._last_above is None:
                self._last_above = frame_num
            # MERGE path
            return self._filter_merge(frame_num, above_threshold)

        def _filter_merge(self, frame_num: int, above_threshold: bool) -> List[int]:
            assert self._last_above is not None
            min_length_met = (frame_num - self._last_above) >= self._filter_length
            if above_threshold:
                self._last_above = frame_num
            if self._merge_triggered:
                assert self._merge_start is not None  # pragma: no cover
                num_merged_frames = self._last_above - self._merge_start  # pragma: no cover
                if min_length_met and (not above_threshold) and (num_merged_frames >= self._filter_length):  # pragma: no cover
                    self._merge_triggered = False  # pragma: no cover
                    return [self._last_above]  # pragma: no cover
                return []  # pragma: no cover
            if not above_threshold:
                return []
            if min_length_met:
                self._merge_enabled = True
                return [frame_num]
            if self._merge_enabled:  # pragma: no cover
                self._merge_triggered = True  # pragma: no cover
                self._merge_start = frame_num  # pragma: no cover
            return []  # pragma: no cover

    min_scene_len = int(fps * 1.5)
    if min_scene_len < 15:
        min_scene_len = 15
    flash_filter = _FlashFilterMerge(length=min_scene_len)

    # 4) Iterate frames, compute HSV components & frame score like ContentDetector
    batch_size = 16
    total_batches = (frame_count + batch_size - 1) // batch_size
    pbar = tqdm(total=total_batches, desc="Detect scenes", unit="batch")

    last_hsv: Optional[Tuple[np.ndarray, np.ndarray, np.ndarray]] = None
    cut_indices: List[int] = []

    with GPUVideoStreamer(
        video_path,
        target_width=w_eff,
        target_height=h_eff,
        pix_fmt=nvc.PixelFormat.BGR,
    ) as streamer:
        for frames_bgr, batch_indices in streamer.stream_batches(batch_size=batch_size):
            frames_cpu = frames_bgr.cpu().numpy()

            # Process each frame sequentially to exactly match CPU semantics
            for j, bgr in enumerate(frames_cpu):
                frame_num = batch_indices[j]
                bgr = np.ascontiguousarray(bgr)

                # OpenCV HSV conversion (exact semantics/hue range)
                hsv = cv2.cvtColor(bgr, cv2.COLOR_BGR2HSV)
                hue, sat, val = cv2.split(hsv)

                if last_hsv is None:
                    last_hsv = (hue, sat, val)
                    above = False
                    flash_filter.filter(frame_num, above_threshold=above)
                    continue

                hue_prev, sat_prev, val_prev = last_hsv
                # Mean pixel distance per channel (match _mean_pixel_distance)
                # cast to int32 to avoid uint8 underflow
                dh = np.abs(hue.astype(np.int32) - hue_prev.astype(np.int32)).sum() / float(hue.size)
                ds = np.abs(sat.astype(np.int32) - sat_prev.astype(np.int32)).sum() / float(sat.size)
                dv = np.abs(val.astype(np.int32) - val_prev.astype(np.int32)).sum() / float(val.size)
                frame_score = (dh + ds + dv) / 3.0

                last_hsv = (hue, sat, val)

                # Compare against threshold exactly like ContentDetector
                above = frame_score >= threshold
                emitted = flash_filter.filter(frame_num=frame_num, above_threshold=above)
                if emitted:
                    cut_indices.extend(emitted)

            pbar.update(1)
            del frames_bgr, frames_cpu

    pbar.close()

    if not cut_indices:
        return []

    cut_indices = sorted(set(cut_indices))
    scenes: List[Tuple[_SecondsTime, _SecondsTime]] = []
    last_cut = 0
    for cut in cut_indices:
        start_time = last_cut / fps
        end_time = cut / fps
        scenes.append((_SecondsTime(start_time), _SecondsTime(end_time)))
        last_cut = cut
    # Last scene from last cut to end_pos (= frame_count, exclusive)
    scenes.append((_SecondsTime(last_cut / fps), _SecondsTime(frame_count / fps)))

    return scenes

Action Profiling (Audio)

Computes an audio-based "action score" on the GPU using memory-efficient batching.

This function analyzes the audio track to identify high-energy moments (action). It calculates a combined score based on Root Mean Square (RMS) energy for volume/loudness and Spectral Flux for sudden changes in frequencies (roughness/impacts). Operations are vectorized and executed on the GPU via PyTorch to handle long videos efficiently.

Parameters:

Name Type Description Default
video_path Path

Path to the input video or audio file.

required
frame_length int

The size of the STFT window and RMS frame (in samples). Higher values give better frequency resolution but worse time resolution.

2048
hop_length int

The number of samples between successive frames. Determines the temporal resolution of the output score.

512

Returns:

Type Description
Tuple[ndarray, ndarray]

A tuple containing: - times (np.ndarray): Array of timestamps (in seconds) corresponding to each audio frame. - score (np.ndarray): Array of normalized, smoothed action scores combining RMS and Spectral Flux. Returns empty arrays if audio loading fails.

Notes
  • Audio is processed in 2-minute chunks to maintain a low RAM/VRAM footprint.
  • The final score is a weighted combination: 0.6 * RMS + 0.4 * Spectral Flux.
  • Includes GPU-accelerated 1D convolution for smoothing the final score array.
Source code in src/shorts_maker/analysis/audio.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
@torch.no_grad()  # type: ignore
def compute_audio_action_profile(
    video_path: Path,
    frame_length: int = 2048,
    hop_length: int = 512,
) -> Tuple[np.ndarray, np.ndarray]:
    """Computes an audio-based "action score" on the GPU using memory-efficient batching.

    This function analyzes the audio track to identify high-energy moments (action). 
    It calculates a combined score based on Root Mean Square (RMS) energy for volume/loudness 
    and Spectral Flux for sudden changes in frequencies (roughness/impacts). 
    Operations are vectorized and executed on the GPU via PyTorch to handle long videos efficiently.

    Args:
        video_path: Path to the input video or audio file.
        frame_length: The size of the STFT window and RMS frame (in samples). 
            Higher values give better frequency resolution but worse time resolution.
        hop_length: The number of samples between successive frames. 
            Determines the temporal resolution of the output score.

    Returns:
        A tuple containing:
            - times (np.ndarray): Array of timestamps (in seconds) corresponding to each audio frame.
            - score (np.ndarray): Array of normalized, smoothed action scores combining RMS and Spectral Flux. 
              Returns empty arrays if audio loading fails.

    Notes:
        - Audio is processed in 2-minute chunks to maintain a low RAM/VRAM footprint.
        - The final score is a weighted combination: 0.6 * RMS + 0.4 * Spectral Flux.
        - Includes GPU-accelerated 1D convolution for smoothing the final score array.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    import tempfile
    import subprocess
    import os
    import wave

    use_wave_fallback = False
    temp_dir_obj = None
    wf = None

    try:
        info = torchaudio.info(str(video_path))
        sample_rate = info.sample_rate
        total_samples = info.num_frames
    except Exception as e:
        logger.warning(f"Native torchaudio failed for {video_path.name}: {e}. Trying ffmpeg+wave fallback...")
        temp_dir_obj = tempfile.TemporaryDirectory()
        temp_audio_path = os.path.join(temp_dir_obj.name, "extracted.wav")
        cmd = [
            "ffmpeg",
            "-y",
            "-i", str(video_path),
            "-vn",
            "-acodec", "pcm_s16le",
            "-ar", "44100",
            "-ac", "1",
            temp_audio_path
        ]
        try:
            subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
            wf = wave.open(temp_audio_path, 'rb')
            sample_rate = wf.getframerate()
            total_samples = wf.getnframes()
            use_wave_fallback = True
        except Exception as fallback_e:
            logger.error(f"Failed to load audio from {video_path} even with fallback: {fallback_e}")
            if wf:
                wf.close()  # pragma: no cover
            if temp_dir_obj:
                temp_dir_obj.cleanup()
            return np.array([]), np.array([])

    rms_values = []
    flux_values = []

    window = torch.hann_window(2048).to(device)
    last_mag_col = torch.zeros(2048 // 2 + 1, device=device)

    # Process in chunks of 2 minutes to save RAM
    # Make sure chunk_frames is a multiple of hop_length
    chunk_frames = (sample_rate * 120 // hop_length) * hop_length
    # Overlap allows STFT and RMS to be seamless across boundaries
    overlap_frames = frame_length

    current_frame = 0
    pbar = tqdm(
        total=total_samples if total_samples > 0 else 1,
        desc="Audio Profile",
        unit="samples",
    )

    while current_frame < total_samples or total_samples <= 0:
        read_count = chunk_frames + (overlap_frames if current_frame > 0 else 0)
        read_start = max(0, current_frame - overlap_frames)

        try:
            if use_wave_fallback:
                assert wf is not None
                frames_to_read = min(read_count, total_samples - read_start)
                if frames_to_read <= 0:
                    break  # pragma: no cover
                wf.setpos(read_start)
                raw_bytes = wf.readframes(frames_to_read)
                if not raw_bytes:
                    break  # pragma: no cover
                audio_np = np.frombuffer(raw_bytes, dtype='<i2').astype(np.float32) / 32768.0
                waveform = torch.from_numpy(audio_np).unsqueeze(0)
                sr = sample_rate
            else:
                waveform, sr = torchaudio.load(
                    str(video_path),
                    frame_offset=read_start,
                    num_frames=read_count,
                    normalize=True,
                )
        except Exception:
            logger.error(f"Error reading audio chunk at {read_start}")
            break

        if waveform.shape[1] == 0:
            break  # pragma: no cover

        if waveform.shape[0] > 1:
            waveform = torch.mean(waveform, dim=0, keepdim=True)

        y_cpu = waveform.squeeze(0)
        actual_length = y_cpu.shape[0]

        if actual_length < frame_length:
            y_cpu = torch.nn.functional.pad(y_cpu, (0, frame_length - actual_length))  # pragma: no cover

        chunk_tensor = y_cpu.to(device)

        # --- RMS ---
        windows = chunk_tensor.unfold(0, frame_length, hop_length)
        rms_chunk = torch.sqrt(torch.mean(windows**2, dim=1))
        rms_values.append(rms_chunk)

        # --- STFT ---
        # reflect pad
        y_padded = torch.nn.functional.pad(
            chunk_tensor.unsqueeze(0), (2048 // 2, 2048 // 2), mode="reflect"
        ).squeeze(0)
        stft_chunk = torch.stft(
            y_padded,
            n_fft=2048,
            hop_length=hop_length,
            window=window,
            center=False,
            return_complex=True,
        )
        mag_chunk = torch.abs(stft_chunk)

        combined = torch.cat([last_mag_col.unsqueeze(1), mag_chunk], dim=1)
        diff = combined[:, 1:] - combined[:, :-1]
        flux_chunk = torch.sqrt(torch.sum(diff**2, dim=0))
        flux_values.append(flux_chunk)

        last_mag_col = mag_chunk[:, -1]

        del chunk_tensor, windows, y_padded, stft_chunk, mag_chunk, combined, diff

        actual_forward = actual_length - (overlap_frames if current_frame > 0 else 0)
        if actual_forward <= 0:
            break  # pragma: no cover

        current_frame += actual_forward
        pbar.update(actual_forward)

        # EOF detection
        if actual_length < read_count:
            break

    pbar.close()

    rms = torch.cat(rms_values) if rms_values else torch.tensor([], device=device)
    spectral_flux = (
        torch.cat(flux_values) if flux_values else torch.tensor([], device=device)
    )

    # --- Post Processing ---
    min_len = min(rms.shape[0], spectral_flux.shape[0])
    rms = rms[:min_len]
    spectral_flux = spectral_flux[:min_len]

    rms_mean = rms.mean() if rms.numel() > 0 else torch.tensor(0.0, device=device)
    rms_std = (
        (rms.std() + 1e-8) if rms.numel() > 0 else torch.tensor(1.0, device=device)
    )
    rms_norm = (rms - rms_mean) / rms_std if rms.numel() > 0 else rms

    flux_mean = (
        spectral_flux.mean()
        if spectral_flux.numel() > 0
        else torch.tensor(0.0, device=device)
    )
    flux_std = (
        (spectral_flux.std() + 1e-8)
        if spectral_flux.numel() > 0
        else torch.tensor(1.0, device=device)
    )
    flux_norm = (
        (spectral_flux - flux_mean) / flux_std
        if spectral_flux.numel() > 0
        else spectral_flux
    )

    def smooth_gpu(x: torch.Tensor, win: int = 21) -> torch.Tensor:
        if x.numel() == 0:
            return x
        if win > x.shape[0]:
            win = x.shape[0]  # pragma: no cover
        if win % 2 == 0:
            win += 1  # pragma: no cover
        padding = win // 2
        kernel = torch.ones(win, device=device) / win
        x_reshaped = x.view(1, 1, -1)
        kernel_reshaped = kernel.view(1, 1, -1)
        out = torch.nn.functional.conv1d(x_reshaped, kernel_reshaped, padding=padding)
        return out.view(-1)

    rms_smooth = smooth_gpu(rms_norm, win=21)
    flux_smooth = smooth_gpu(flux_norm, win=21)

    score = (
        0.6 * rms_smooth + 0.4 * flux_smooth
        if rms_smooth.numel() > 0 and flux_smooth.numel() > 0
        else (rms_smooth if flux_smooth.numel() == 0 else flux_smooth)
    )

    num_frames_out = score.shape[0]
    times = (
        torch.arange(num_frames_out, device=device) * hop_length / sample_rate
        if num_frames_out > 0
        else torch.tensor([], device=device)
    )

    if wf:
        wf.close()
    if temp_dir_obj:
        temp_dir_obj.cleanup()

    return times.cpu().numpy(), score.cpu().numpy()

Action Profiling (Video)

Computes a frame-by-frame video "action score" entirely on the GPU.

Uses the GPUVideoStreamer to read frames directly into VRAM, converts them to grayscale, and calculates the mean absolute pixel difference between consecutive frames to quantify motion/action.

Parameters:

Name Type Description Default
video_path Path

Path to the input video file.

required
fps int

Target framerate for subsampling (reduces computational load).

6
downscale_factor int

Factor by which to reduce frame dimensions before computing diffs.

4

Returns:

Type Description
Tuple[ndarray, ndarray]

A tuple containing: - times (np.ndarray): Array of timestamps (in seconds) for each evaluated frame. - score (np.ndarray): Array of normalized, smoothed action scores.

Source code in src/shorts_maker/analysis/video.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@torch.no_grad()  # type: ignore
def compute_video_action_profile(
    video_path: Path,
    fps: int = 6,
    downscale_factor: int = 4,
) -> Tuple[np.ndarray, np.ndarray]:
    """Computes a frame-by-frame video "action score" entirely on the GPU.

    Uses the GPUVideoStreamer to read frames directly into VRAM, converts them to 
    grayscale, and calculates the mean absolute pixel difference between consecutive 
    frames to quantify motion/action.

    Args:
        video_path: Path to the input video file.
        fps: Target framerate for subsampling (reduces computational load).
        downscale_factor: Factor by which to reduce frame dimensions before computing diffs.

    Returns:
        A tuple containing:
            - times (np.ndarray): Array of timestamps (in seconds) for each evaluated frame.
            - score (np.ndarray): Array of normalized, smoothed action scores.
    """
    # 1) Get metadata and calculate dimensions
    try:
        dmx = nvc.PyFFmpegDemuxer(str(video_path))
        orig_fps = float(dmx.Framerate())
        w_new = max(1, dmx.Width() // downscale_factor)
        h_new = max(1, dmx.Height() // downscale_factor)
        del dmx
    except Exception:
        logger.warning("Failed to load video for action profile.", exc_info=True)
        return np.array([]), np.array([])

    eff_fps = min(float(fps), orig_fps)
    if eff_fps <= 0:
        eff_fps = max(1.0, float(fps))

    # Calculate step for subsampling
    step = max(1, int(orig_fps / eff_fps))

    motions = []
    times = []
    prev_batch_last = None

    with GPUVideoStreamer(
        video_path, target_width=w_new, target_height=h_new
    ) as streamer:
        total_batches = int(np.ceil(streamer.total_frames / (step * 16)))
        pbar = tqdm(total=total_batches, desc="Video Action Profile", unit="batch")

        # GPUVideoStreamer natively handles iterating to the end without hanging
        # and outputs only the batches representing the requested `step`
        for frames_subset, global_indices in streamer.stream_batches(
            batch_size=16, step=step
        ):
            frames_subset = frames_subset.float()

            # Grayscale conversion on GPU
            gray = (
                frames_subset[..., 0] * 0.299
                + frames_subset[..., 1] * 0.587
                + frames_subset[..., 2] * 0.114
            )

            # Diff computation
            if prev_batch_last is not None:
                combined = torch.cat([prev_batch_last.unsqueeze(0), gray])
                diffs = torch.abs(combined[1:] - combined[:-1])
            else:
                combined = torch.cat([gray[0:1], gray])
                diffs = torch.abs(combined[1:] - combined[:-1])
                diffs[0] = 0.0

            # Mean diff per frame
            batch_motions = diffs.mean(dim=(1, 2))
            motions.append(batch_motions)

            # Timestamps
            batch_times = torch.tensor(global_indices, device=gray.device).float() / orig_fps
            times.append(batch_times)

            # Update last processed frame for next continuity
            prev_batch_last = gray[-1]

            del frames_subset, gray, diffs

            pbar.update(1)

        pbar.close()

    if len(motions) == 0:
        return np.array([]), np.array([])

    motions_t = torch.cat(motions)
    times_t = torch.cat(times)

    # Normalize and smooth (similar to audio)
    if motions_t.numel() == 0:
        return np.array([]), np.array([])  # pragma: no cover
    if motions_t.std() == 0:
        motions_norm = motions_t
    else:
        motions_norm = (motions_t - motions_t.mean()) / (motions_t.std() + 1e-8)

    # Smooth
    def smooth_gpu(x: torch.Tensor, win: int) -> torch.Tensor:
        if win > x.shape[0]:
            win = x.shape[0]
        if win < 2:
            return x
        kernel = torch.ones(win, device=x.device) / win
        x_reshaped = x.view(1, 1, -1)
        kernel_reshaped = kernel.view(1, 1, -1)
        out = torch.nn.functional.conv1d(x_reshaped, kernel_reshaped, padding=win // 2)
        return out.view(-1)[: x.shape[0]]

    score = smooth_gpu(motions_norm, win=int(eff_fps))

    return times_t.cpu().numpy(), score.cpu().numpy()