Project

General

Profile

Files » video_processor.py

Zhi Jie YEW, 11/06/2025 02:43 PM

 
1
import cv2
2
import numpy as np
3
import os
4
import time
5

    
6
class VideoProcessor:
7
    """
8
    A class to handle dividing a video and applying alpha blending to the edges.
9
    Consolidates logic from divide_video.py, apply_alpha_blending_on_video.py, and Video_utility.py.
10
    """
11
    def __init__(self, config=None):
12
        """Initializes the processor with default or provided settings."""
13
        # Set default parameters
14
        self.input_video_path = ""
15
        self.output_dir = "VideoResults"
16
        self.blend_width = 100
17
        self.blend_method = "linear"
18
        self.divide_ratio = 2/3
19

    
20
        # Overwrite defaults with a configuration dictionary if provided
21
        if config:
22
            self.input_video_path = config.get("input_video_path", self.input_video_path)
23
            self.output_dir = config.get("output_dir", self.output_dir)
24
            self.blend_width = config.get("blend_width", self.blend_width)
25
            self.blend_method = config.get("blend_method", self.blend_method)
26
            self.divide_ratio = config.get("divide_ratio", self.divide_ratio)
27

    
28
    def _create_alpha_gradient(self, blend_width, side, method):
29
        """Creates a 1D alpha gradient for blending."""
30
        if method == 'linear':
31
            alpha_gradient = np.linspace(0, 1, blend_width)
32
        elif method == 'cosine':
33
            t = np.linspace(0, np.pi, blend_width)
34
            alpha_gradient = (1 - np.cos(t)) / 2
35
        else:
36
            raise ValueError(f"Invalid blend method: {method}")
37

    
38
        if side == 'right':
39
            alpha_gradient = 1 - alpha_gradient  # Create a fade-out gradient
40
        return alpha_gradient
41

    
42
    def _blend_image_edge(self, image, blend_width, side, method):
43
        """Applies the alpha gradient to a single frame."""
44
        height, width, _ = image.shape
45
        blended_image = image.copy()
46
        alpha_gradient = self._create_alpha_gradient(blend_width, side, method)
47

    
48
        if side == 'right':
49
            roi = blended_image[:, width - blend_width:]
50
        elif side == 'left':
51
            roi = blended_image[:, :blend_width]
52
        else:
53
            raise ValueError("Side must be 'left' or 'right'")
54

    
55
        # Tile the 1D gradient to match the 3 color channels of the ROI
56
        gradient_3d = alpha_gradient[np.newaxis, :, np.newaxis]
57
        gradient_3d = np.tile(gradient_3d, (height, 1, 3))
58

    
59
        if side == 'right':
60
            blended_image[:, width - blend_width:] = (roi * gradient_3d).astype(np.uint8)
61
        else:
62
            blended_image[:, :blend_width] = (roi * gradient_3d).astype(np.uint8)
63

    
64
        return blended_image
65

    
66
    def _divide_video(self, input_path, output_left_path, output_right_path, status_callback):
67
        """Splits a video into two halves based on the divide_ratio."""
68
        cap = cv2.VideoCapture(input_path)
69
        if not cap.isOpened():
70
            raise FileNotFoundError(f"Could not open video file: {input_path}")
71

    
72
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
73
        fps = cap.get(cv2.CAP_PROP_FPS)
74
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
75
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
76
        midpoint = int(width * self.divide_ratio)
77

    
78
        out_left = cv2.VideoWriter(output_left_path, fourcc, fps, (midpoint, height))
79
        out_right = cv2.VideoWriter(output_right_path, fourcc, fps, (width - midpoint, height))
80

    
81
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
82
        frame_count = 0
83

    
84
        while cap.isOpened():
85
            ret, frame = cap.read()
86
            if not ret: break
87
            
88
            frame_count += 1
89
            if status_callback and frame_count % 30 == 0: # Update status every 30 frames
90
                progress = int((frame_count / total_frames) * 100)
91
                status_callback(f"Dividing video... {progress}%")
92

    
93
            out_left.write(frame[:, :midpoint])
94
            out_right.write(frame[:, midpoint:])
95

    
96
        cap.release()
97
        out_left.release()
98
        out_right.release()
99

    
100
    def _apply_alpha_blending_to_video(self, input_path, output_path, side, status_callback):
101
        """Applies alpha blending to each frame of a video."""
102
        cap = cv2.VideoCapture(input_path)
103
        if not cap.isOpened(): raise FileNotFoundError(f"Could not open video for blending: {input_path}")
104

    
105
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
106
        fps = cap.get(cv2.CAP_PROP_FPS)
107
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
108
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
109
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
110

    
111
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
112
        frame_count = 0
113

    
114
        while cap.isOpened():
115
            ret, frame = cap.read()
116
            if not ret: break
117
            
118
            frame_count += 1
119
            if status_callback and frame_count % 30 == 0:
120
                progress = int((frame_count / total_frames) * 100)
121
                status_callback(f"Blending {side} video... {progress}%")
122

    
123
            blended_frame = self._blend_image_edge(frame, self.blend_width, side, self.blend_method)
124
            out.write(blended_frame)
125

    
126
        cap.release()
127
        out.release()
128

    
129
    def run(self, status_callback=None):
130
        """Executes the full video processing pipeline."""
131
        try:
132
            start_time = time.time()
133
            os.makedirs(self.output_dir, exist_ok=True)
134

    
135
            # Define intermediate and final file paths
136
            temp_left_path = os.path.join(self.output_dir, "temp_left.mp4")
137
            temp_right_path = os.path.join(self.output_dir, "temp_right.mp4")
138
            final_left_path = os.path.join(self.output_dir, "final_left.mp4")
139
            final_right_path = os.path.join(self.output_dir, "final_right.mp4")
140

    
141
            if status_callback: status_callback("Starting to divide video...")
142
            self._divide_video(self.input_video_path, temp_left_path, temp_right_path, status_callback)
143
            
144
            if status_callback: status_callback("Starting to blend left video...")
145
            self._apply_alpha_blending_to_video(temp_left_path, final_left_path, "right", status_callback)
146

    
147
            if status_callback: status_callback("Starting to blend right video...")
148
            self._apply_alpha_blending_to_video(temp_right_path, final_right_path, "left", status_callback)
149

    
150
            if status_callback: status_callback("Cleaning up temporary files...")
151
            os.remove(temp_left_path)
152
            os.remove(temp_right_path)
153

    
154
            duration = time.time() - start_time
155
            message = f"Video processing complete in {duration:.2f}s. Files saved in '{self.output_dir}'."
156
            if status_callback: status_callback(message)
157
            return (True, message)
158

    
159
        except Exception as e:
160
            if status_callback: status_callback(f"Error: {e}")
161
            return (False, str(e))
(6-6/8)