Skip to content

Video Processor

This is the main orchestrator that ties together scene detection, audio/video scoring, and rendering.

Core processor for generating viral shorts from long-form video.

This class orchestrates the entire hardware-accelerated pipeline: scene detection, audio/video action profiling, intelligent clipping (smart cuts), and GPU-based compositing and rendering via NVENC.

Attributes:

Name Type Description
config ProcessingConfig

Configuration settings for the generation pipeline.

Source code in src/shorts_maker/core/processor.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
class VideoProcessor:
    """Core processor for generating viral shorts from long-form video.

    This class orchestrates the entire hardware-accelerated pipeline: scene detection,
    audio/video action profiling, intelligent clipping (smart cuts), and GPU-based 
    compositing and rendering via NVENC.

    Attributes:
        config (ProcessingConfig): Configuration settings for the generation pipeline.
    """

    def __init__(self, config: ProcessingConfig):
        """Initializes the video processor with the given configuration.

        Args:
            config: A ProcessingConfig object containing target aspect ratios, 
                scene limits, and duration constraints.
        """
        self.config = config

    def process_video(self, video_file: Path, output_dir: Path) -> None:
        """Processes a single video file to generate multiple short clips.

        Analyzes the video to find high-action scenes using combined audio-visual 
        scoring, groups them by length, determines optimal start/end points using a 
        smart cut algorithm, and dispatches the rendering process.

        Args:
            video_file: Path to the source gameplay video file.
            output_dir: Directory where the generated short clips will be saved.

        Raises:
            RuntimeError: If the rendering process fails or FFmpeg encounters an error.
        """
        logger.info("\nProcess: %s", video_file.name)

        logger.info("Detecting scenes (GPU)...")
        scene_list = detect_video_scenes_gpu(video_file, threshold=self.config.scene_threshold)

        logger.info("Detected scenes:")
        for i, scene in enumerate(scene_list, start=1):
            duration = scene[1].get_seconds() - scene[0].get_seconds()
            logger.info(
                "Scene %2d: Duration %5.1f s, Start %s / Frame %d, End %s / Frame %d",
                i,
                duration,
                scene[0].get_timecode(),
                scene[0].get_frames(),
                scene[1].get_timecode(),
                scene[1].get_frames(),
            )

        # Explicitly clear memory
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()  # pragma: no cover

        logger.info("Computing audio action profile (GPU)...")
        audio_times, audio_score = compute_audio_action_profile(video_file)

        # Explicitly clear memory
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()  # pragma: no cover

        logger.info("Computing video action profile (GPU)...")
        video_times, video_score = compute_video_action_profile(
            video_file,
            fps=4,
            downscale_factor=6,
        )

        # Explicitly clear memory
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()  # pragma: no cover

        # Pre-calculate video duration for boundary checks
        try:
            dmx = nvc.PyFFmpegDemuxer(str(video_file))
            video_duration = float(dmx.Numframes() / dmx.Framerate())
            del dmx
        except Exception:  # pragma: no cover
            logger.warning("PyNvCodec probe failed, fallback to 0 duration.")  # pragma: no cover
            video_duration = 0.0  # pragma: no cover

        processed_scene_list = combine_scenes(scene_list, self.config)
        processed_scene_list = split_overlong_scenes(processed_scene_list, self.config)

        logger.info("Scenes list with action scores:")
        for i, scene in enumerate(processed_scene_list, start=1):
            duration = scene[1].get_seconds() - scene[0].get_seconds()
            score_val = scene_action_score(
                scene, audio_times, audio_score, video_times, video_score
            )
            logger.info(
                "Scene %2d: Duration %5.1f s, ActionScore %7.3f, Start %s / Frame %d, End %s / Frame %d",
                i,
                duration,
                score_val,
                scene[0].get_timecode(),
                scene[0].get_frames(),
                scene[1].get_timecode(),
                scene[1].get_frames(),
            )

        sorted_processed_scene_list = sorted(
            processed_scene_list,
            key=lambda s: scene_action_score(
                s, audio_times, audio_score, video_times, video_score
            ),
            reverse=True,
        )

        logger.info("Sorted scenes list (by action score):")
        for i, scene in enumerate(sorted_processed_scene_list, start=1):
            duration = scene[1].get_seconds() - scene[0].get_seconds()
            score_val = scene_action_score(
                scene, audio_times, audio_score, video_times, video_score
            )
            logger.info(
                "Scene %2d: ActionScore %7.3f, Duration %5.1f s, Start %s / Frame %d, End %s / Frame %d",
                i,
                score_val,
                duration,
                scene[0].get_timecode(),
                scene[0].get_frames(),
                scene[1].get_timecode(),
                scene[1].get_frames(),
            )

        truncated_list = sorted_processed_scene_list[: self.config.scene_limit]

        if truncated_list:
            for i, scene in enumerate(truncated_list, start=1):
                scene_start = scene[0].get_seconds()
                scene_end = scene[1].get_seconds()
                scene_duration = scene_end - scene_start

                # STRATEGY 1: If scene fits entirely - take it all.
                # We add a small padding (1.5s) to capture the "end scene animation/fade".
                if scene_duration <= self.config.max_short_length:
                    final_start = scene_start
                    padding = 1.5
                    final_end = min(scene_end + padding, video_duration)

                    # Check if padding pushes us over max limit
                    if (final_end - final_start) > self.config.max_short_length:
                        final_end = final_start + self.config.max_short_length  # pragma: no cover

                    final_duration = final_end - final_start
                    logger.info(
                        f"Scene {i}: Full scene + padding ({final_duration:.2f}s)"
                    )

                # STRATEGY 2: Scene too long, cut best window with smart end.
                else:
                    target_duration = float(self.config.max_short_length)

                    best_start = best_action_window_start(
                        scene,
                        target_duration,
                        audio_times,
                        audio_score,
                        video_times,
                        video_score,
                    )

                    absolute_min_end = best_start + self.config.min_short_length
                    absolute_max_end = min(
                        scene_end, best_start + self.config.max_short_length
                    )

                    final_end = find_smart_end_point(
                        best_start,
                        absolute_min_end,
                        absolute_max_end,
                        audio_times,
                        audio_score,
                        search_window=5.0,
                    )

                    final_start = best_start
                    final_duration = final_end - final_start
                    logger.info(
                        f"Scene {i}: Smart Cut. Start {final_start:.2f}, End {final_end:.2f} (Duration {final_duration:.2f}s)"
                    )

                render_file_name = f"{video_file.stem} scene-{i}{video_file.suffix}"
                render_path = output_dir / render_file_name

                # Prepare render params
                params = get_render_params(
                    video_file, final_start, final_duration, self.config
                )

                # Execute GPU render
                render_video_gpu_isolated(
                    params,
                    render_path,
                    max_error_depth=self.config.max_error_depth,
                    save_ffmpeg_logs=self.config.save_ffmpeg_logs,
                )
        else:
            # No scenes found, fallback to random clip
            short_length = random.randint(
                self.config.min_short_length, self.config.max_short_length
            )

            if video_duration < self.config.max_short_length:
                adapted_short_length = min(math.floor(video_duration), short_length)
            else:
                adapted_short_length = short_length  # pragma: no cover

            min_start_point = min(
                10, math.floor(video_duration) - adapted_short_length
            )
            max_start_point = math.floor(video_duration - adapted_short_length)

            start_point = float(
                random.randint(int(min_start_point), int(max_start_point))
            )

            params = get_render_params(
                video_file,
                start_point,
                float(adapted_short_length),
                self.config,
            )

            render_video_gpu_isolated(
                params,
                output_dir / video_file.name,
                max_error_depth=self.config.max_error_depth,
                save_ffmpeg_logs=self.config.save_ffmpeg_logs,
            )

__init__(config)

Initializes the video processor with the given configuration.

Parameters:

Name Type Description Default
config ProcessingConfig

A ProcessingConfig object containing target aspect ratios, scene limits, and duration constraints.

required
Source code in src/shorts_maker/core/processor.py
39
40
41
42
43
44
45
46
def __init__(self, config: ProcessingConfig):
    """Initializes the video processor with the given configuration.

    Args:
        config: A ProcessingConfig object containing target aspect ratios, 
            scene limits, and duration constraints.
    """
    self.config = config

process_video(video_file, output_dir)

Processes a single video file to generate multiple short clips.

Analyzes the video to find high-action scenes using combined audio-visual scoring, groups them by length, determines optimal start/end points using a smart cut algorithm, and dispatches the rendering process.

Parameters:

Name Type Description Default
video_file Path

Path to the source gameplay video file.

required
output_dir Path

Directory where the generated short clips will be saved.

required

Raises:

Type Description
RuntimeError

If the rendering process fails or FFmpeg encounters an error.

Source code in src/shorts_maker/core/processor.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def process_video(self, video_file: Path, output_dir: Path) -> None:
    """Processes a single video file to generate multiple short clips.

    Analyzes the video to find high-action scenes using combined audio-visual 
    scoring, groups them by length, determines optimal start/end points using a 
    smart cut algorithm, and dispatches the rendering process.

    Args:
        video_file: Path to the source gameplay video file.
        output_dir: Directory where the generated short clips will be saved.

    Raises:
        RuntimeError: If the rendering process fails or FFmpeg encounters an error.
    """
    logger.info("\nProcess: %s", video_file.name)

    logger.info("Detecting scenes (GPU)...")
    scene_list = detect_video_scenes_gpu(video_file, threshold=self.config.scene_threshold)

    logger.info("Detected scenes:")
    for i, scene in enumerate(scene_list, start=1):
        duration = scene[1].get_seconds() - scene[0].get_seconds()
        logger.info(
            "Scene %2d: Duration %5.1f s, Start %s / Frame %d, End %s / Frame %d",
            i,
            duration,
            scene[0].get_timecode(),
            scene[0].get_frames(),
            scene[1].get_timecode(),
            scene[1].get_frames(),
        )

    # Explicitly clear memory
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()  # pragma: no cover

    logger.info("Computing audio action profile (GPU)...")
    audio_times, audio_score = compute_audio_action_profile(video_file)

    # Explicitly clear memory
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()  # pragma: no cover

    logger.info("Computing video action profile (GPU)...")
    video_times, video_score = compute_video_action_profile(
        video_file,
        fps=4,
        downscale_factor=6,
    )

    # Explicitly clear memory
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()  # pragma: no cover

    # Pre-calculate video duration for boundary checks
    try:
        dmx = nvc.PyFFmpegDemuxer(str(video_file))
        video_duration = float(dmx.Numframes() / dmx.Framerate())
        del dmx
    except Exception:  # pragma: no cover
        logger.warning("PyNvCodec probe failed, fallback to 0 duration.")  # pragma: no cover
        video_duration = 0.0  # pragma: no cover

    processed_scene_list = combine_scenes(scene_list, self.config)
    processed_scene_list = split_overlong_scenes(processed_scene_list, self.config)

    logger.info("Scenes list with action scores:")
    for i, scene in enumerate(processed_scene_list, start=1):
        duration = scene[1].get_seconds() - scene[0].get_seconds()
        score_val = scene_action_score(
            scene, audio_times, audio_score, video_times, video_score
        )
        logger.info(
            "Scene %2d: Duration %5.1f s, ActionScore %7.3f, Start %s / Frame %d, End %s / Frame %d",
            i,
            duration,
            score_val,
            scene[0].get_timecode(),
            scene[0].get_frames(),
            scene[1].get_timecode(),
            scene[1].get_frames(),
        )

    sorted_processed_scene_list = sorted(
        processed_scene_list,
        key=lambda s: scene_action_score(
            s, audio_times, audio_score, video_times, video_score
        ),
        reverse=True,
    )

    logger.info("Sorted scenes list (by action score):")
    for i, scene in enumerate(sorted_processed_scene_list, start=1):
        duration = scene[1].get_seconds() - scene[0].get_seconds()
        score_val = scene_action_score(
            scene, audio_times, audio_score, video_times, video_score
        )
        logger.info(
            "Scene %2d: ActionScore %7.3f, Duration %5.1f s, Start %s / Frame %d, End %s / Frame %d",
            i,
            score_val,
            duration,
            scene[0].get_timecode(),
            scene[0].get_frames(),
            scene[1].get_timecode(),
            scene[1].get_frames(),
        )

    truncated_list = sorted_processed_scene_list[: self.config.scene_limit]

    if truncated_list:
        for i, scene in enumerate(truncated_list, start=1):
            scene_start = scene[0].get_seconds()
            scene_end = scene[1].get_seconds()
            scene_duration = scene_end - scene_start

            # STRATEGY 1: If scene fits entirely - take it all.
            # We add a small padding (1.5s) to capture the "end scene animation/fade".
            if scene_duration <= self.config.max_short_length:
                final_start = scene_start
                padding = 1.5
                final_end = min(scene_end + padding, video_duration)

                # Check if padding pushes us over max limit
                if (final_end - final_start) > self.config.max_short_length:
                    final_end = final_start + self.config.max_short_length  # pragma: no cover

                final_duration = final_end - final_start
                logger.info(
                    f"Scene {i}: Full scene + padding ({final_duration:.2f}s)"
                )

            # STRATEGY 2: Scene too long, cut best window with smart end.
            else:
                target_duration = float(self.config.max_short_length)

                best_start = best_action_window_start(
                    scene,
                    target_duration,
                    audio_times,
                    audio_score,
                    video_times,
                    video_score,
                )

                absolute_min_end = best_start + self.config.min_short_length
                absolute_max_end = min(
                    scene_end, best_start + self.config.max_short_length
                )

                final_end = find_smart_end_point(
                    best_start,
                    absolute_min_end,
                    absolute_max_end,
                    audio_times,
                    audio_score,
                    search_window=5.0,
                )

                final_start = best_start
                final_duration = final_end - final_start
                logger.info(
                    f"Scene {i}: Smart Cut. Start {final_start:.2f}, End {final_end:.2f} (Duration {final_duration:.2f}s)"
                )

            render_file_name = f"{video_file.stem} scene-{i}{video_file.suffix}"
            render_path = output_dir / render_file_name

            # Prepare render params
            params = get_render_params(
                video_file, final_start, final_duration, self.config
            )

            # Execute GPU render
            render_video_gpu_isolated(
                params,
                render_path,
                max_error_depth=self.config.max_error_depth,
                save_ffmpeg_logs=self.config.save_ffmpeg_logs,
            )
    else:
        # No scenes found, fallback to random clip
        short_length = random.randint(
            self.config.min_short_length, self.config.max_short_length
        )

        if video_duration < self.config.max_short_length:
            adapted_short_length = min(math.floor(video_duration), short_length)
        else:
            adapted_short_length = short_length  # pragma: no cover

        min_start_point = min(
            10, math.floor(video_duration) - adapted_short_length
        )
        max_start_point = math.floor(video_duration - adapted_short_length)

        start_point = float(
            random.randint(int(min_start_point), int(max_start_point))
        )

        params = get_render_params(
            video_file,
            start_point,
            float(adapted_short_length),
            self.config,
        )

        render_video_gpu_isolated(
            params,
            output_dir / video_file.name,
            max_error_depth=self.config.max_error_depth,
            save_ffmpeg_logs=self.config.save_ffmpeg_logs,
        )