Project

General

Profile

Actions

Code » History » Revision 3

« Previous | Revision 3/5 (diff) | Next »
Sandeep GANESAN, 11/13/2025 11:23 AM


💻 Code & Documentation


All source code is developed collaboratively using Redmine and documented via Doxygen.
This section will contain references to important code snippets, flow explanations, and system logic.


📁 Repository Links


🧩 Core Modules
Module Description
Image Processing Core blending and correction logic
Calibration Overlap detection and adjustment
UI / CLI Interface for testing and visualization


📘 The code

ConfigReader.py

import configparser

class ConfigReader:
    """ 
    @brief Configuration file reader class

    @details This class provides methods to read, parse and retrieve 
    configuration parameters from INI-style configuration files.
    It uses Python's configparser module internally.
    """ 
    def __init__(self, config_path):
        """ 
      @brief Constructor for ConfigReader class

      @param config_path Path to the configuration file
      @exception FileNotFoundError If configuration file is not found
      """ 
        self.config_path = config_path
        self.config = configparser.ConfigParser()
        self.config.read(self.config_path)

    def get_value(self, section, key, fallback=None):
        """ 
        @brief Get value from configuration file

        @param section Configuration section name
        @param key Configuration key name
        @param fallback Default value if key not found
        @return Configuration value as string or fallback value
        """ 
        try:
            return self.config.get(section, key)
        except (configparser.NoSectionError, configparser.NoOptionError):
            return fallback

    def get_linear_parameter(self):
        """ 
        @brief Get linear parameter from configuration

        @return Linear parameter as float value
        """ 
        return float(self.get_value('Parameters', 'linear_parameter'))

    def get_image_path(self):
        """ 
        @brief Get image path from configuration

        @return Image file path as string
        """ 
        return self.get_value('Parameters', 'image_path')

    def get_video_path(self):
        """ 
       @brief Get video path from configuration

       @return Video file path as string
       """ 
        return self.get_value('Parameters', 'video_path')

    def get_overlap(self):
        """ 
        @brief Get overlap parameter from configuration

        @return Overlap value as string
        """ 
        return self.get_value('Parameters', 'overlap')

    def get_blend_mode(self):
        """ 
       @brief Get blend mode from configuration

       @return Blend mode as string
       """ 
        return self.get_value('Parameters', 'blend_mode')

    def save_config(self):
        """ 
        @brief Save current configuration to file

        @details Writes the current configuration state back to the file.
        This can be used to persist changes made to configuration values.
        """ 
        with open(self.config_path, 'w') as configfile:
            self.config.write(configfile)

ui.py

import tkinter as tk
from tkinter import ttk, messagebox
from PIL import Image, ImageTk
import os
import altmain
import video_processing
import cv2
import config_reader

class ImageDisplayApp:
    """ 
    @brief Main GUI application class for image processing and display

    @details This class provides a graphical interface for splitting images,
    processing videos, and displaying results with various blending options.
    """ 
    def __init__(self, root):
        """ 
        @brief Constructor for ImageDisplayApp class

        @param root Tkinter root window object
        """ 
        self.root = root
        self.root.title("Image Split & Update GUI")
        self.root.geometry("1000x750")
        self.root.configure(bg="#f5f5f5")
        self.cfg = config_reader.ConfigReader("config.ini")

        self.image_paths = {
            "main": self.cfg.get_image_path(),
            "left": "left.png",
            "right": "right.png" 
        }

        self.image_frame = ttk.Frame(self.root, padding=10)
        self.image_frame.pack(pady=20)
        self.processor = altmain.ProjectionSplit()
        self.video_processor = video_processing.VideoProcessing("input.mp4")
        self.fps = 60
        self.is_running = True

        self.labels = {}
        for key in self.image_paths:
            lbl = ttk.Label(self.image_frame)
            lbl.pack(side=tk.LEFT, padx=10)
            self.labels[key] = lbl

        control_frame = ttk.Frame(self.root, padding=10)
        control_frame.pack(pady=10)

        ttk.Label(control_frame, text="Overlap (pixels):").grid(row=0, column=0, padx=5)
        self.param_var = tk.IntVar(value=self.cfg.get_overlap())
        entry = ttk.Entry(control_frame, textvariable=self.param_var, width=8)
        entry.grid(row=0, column=1, padx=5)

        ttk.Label(control_frame, text="Blend Type:").grid(row=0, column=2, padx=5)
        self.blend_var = tk.StringVar(value=self.cfg.get_blend_mode())
        blend_box = ttk.Combobox(
            control_frame,
            textvariable=self.blend_var,
            values=["linear", "quadratic", "gaussian"],
            state="readonly",
            width=12
        )
        blend_box.grid(row=0, column=3, padx=5)

        entry.bind("<FocusOut>", lambda e: self.debounced_update())
        blend_box.bind("<<ComboboxSelected>>", lambda e: self.debounced_update())

        timer_frame = ttk.Frame(self.root, padding=10)
        timer_frame.pack(pady=5)

        ttk.Label(timer_frame, text="Start Time (HH:MM:SS):").pack(side=tk.LEFT, padx=5)
        self.start_time_var = tk.StringVar(value="14:30:00")  # default example
        ttk.Entry(timer_frame, textvariable=self.start_time_var, width=10).pack(side=tk.LEFT, padx=5)

        ttk.Button(
            timer_frame,
            text="Start at Time",
            command=self.schedule_start
        ).pack(side=tk.LEFT, padx=10)

        fullscreen_frame = ttk.Frame(self.root, padding=10)
        fullscreen_frame.pack(pady=5)

        ttk.Button(
            fullscreen_frame,
            text="Show Left Fullscreen",
            command=lambda: self.show_fullscreen("left")
        ).pack(side=tk.LEFT, padx=10)

        ttk.Button(
            fullscreen_frame,
            text="Show Right Fullscreen",
            command=lambda: self.show_fullscreen("right")
        ).pack(side=tk.LEFT, padx=10)
        ttk.Button(
            fullscreen_frame,
            text="Run Video",
            command=self.run_video
        ).pack(side=tk.LEFT, padx=10)

        self._update_after_id = None

    def debounced_update(self):
        """ 
        @brief Debounced update to prevent rapid consecutive calls

        @details Cancels pending updates and schedules a new one after delay
        to prevent multiple rapid updates when parameters change.
        """ 
        if self._update_after_id is not None:
            self.root.after_cancel(self._update_after_id)
        self._update_after_id = self.root.after(500, self.run_and_update)

    def show_fullscreen(self, key):
        """ 
        @brief Display image in fullscreen mode

        @param key Image identifier ("left" or "right")
        @exception FileNotFoundError If image file does not exist
        """ 
        path = self.image_paths.get(key)
        if not os.path.exists(path):
            messagebox.showwarning("Missing Image", f"{path} not found.")
            return

        fullscreen = tk.Toplevel(self.root)
        fullscreen.attributes("-fullscreen", True)
        fullscreen.configure(bg="black")
        fullscreen.bind("<Escape>", lambda e: fullscreen.destroy())

        lbl = ttk.Label(fullscreen, background="black")
        lbl.pack(expand=True)

        def update_fullscreen():
            """ 
            @brief Update fullscreen display with current image

            @details Continuously updates the fullscreen display with
            the current image, resizing to fit screen while maintaining aspect ratio.
            """ 
            if not os.path.exists(path):
                return
            try:
                img = Image.open(path)
                screen_w = fullscreen.winfo_screenwidth()
                screen_h = fullscreen.winfo_screenheight()

                img_ratio = img.width / img.height
                screen_ratio = screen_w / screen_h

                if img_ratio > screen_ratio:
                    new_w = screen_w
                    new_h = int(screen_w / img_ratio)
                else:
                    new_h = screen_h
                    new_w = int(screen_h * img_ratio)

                img = img.resize((new_w, new_h), Image.LANCZOS)
                photo = ImageTk.PhotoImage(img)
                lbl.configure(image=photo)
                lbl.image = photo
            except Exception as e:
                print("Fullscreen update error:", e)

            fullscreen.after(100, update_fullscreen)

        update_fullscreen()

    def _update_image_widget(self, key, np_image):
        """ 
        @brief Update individual image widget with new image data

        @param key Widget identifier
        @param np_image Numpy array containing image data
        """ 
        if np_image is None:
            self.labels[key].configure(text=f"No image data", image="")
            self.labels[key].image = None
            return

        if np_image.shape[2] == 4:
            np_image = cv2.cvtColor(np_image, cv2.COLOR_BGRA2RGBA)
        else:
            np_image = cv2.cvtColor(np_image, cv2.COLOR_BGR2RGB)

        img = Image.fromarray(np_image)
        img = img.resize((300, 300))

        photo = ImageTk.PhotoImage(img)

        self.labels[key].configure(image=photo, text="")
        self.labels[key].image = photo

    def run_and_update(self):
        """ 
        @brief Process images and update display

        @details Applies current parameters to process images and
        updates all image displays with the results.

        @exception Exception Catches and displays any processing errors
        """ 
        try:
            overlap_value = int(self.param_var.get())
            blend_type = self.blend_var.get()
            self.processor.process_images(overlap_value, blend_type)
            self.update_images_from_arrays({"left": self.processor.image_left, "right": self.processor.image_right,
                                            "main": self.processor.image_main})
        except Exception as e:
            import traceback
            traceback.print_exc()
            print("TYPE OF E:", type(e))
            print("VALUE OF E:", repr(e))
            messagebox.showerror("Error", repr(e))
    def run_video(self):
        """ 
        @brief Process and display next video frame

        @details Retrieves next frame from video, processes it with
        current parameters, and updates the display.
        """ 

        if not self.is_running:
            return

        image = self.video_processor.get_next_frame()
        if image is None:
            print("End of video.")
            self.is_running = False
            return

        try:
            overlap_value = int(self.param_var.get())
            blend_type = self.blend_var.get()
            self.processor.process_frame(image, overlap_value, blend_type)
            self.update_images_from_arrays({
                "left": self.processor.image_left,
                "right": self.processor.image_right,
                "main": self.processor.image_main
            })
        except Exception as e:
            import traceback
            traceback.print_exc()
            messagebox.showerror("Error", repr(e))
            self.is_running = False
            return

        delay = int(1000 / self.fps)
        self.root.after(delay, self.run_video)

    def start_video(self):
        """ 
        @brief Start video processing and display
        """ 
        self.is_running = True
        self.run_video()

    def stop_video(self):
        """ 
        @brief Stop video processing and release resources
        """ 
        self.is_running = False
        if self.video_processor.cap is not None:
            self.video_processor.release()

    def update_images_from_arrays(self, images: dict):
        """ 
        @brief Update all image widgets from numpy arrays

        @param images Dictionary of image identifiers and numpy arrays
        """ 
        for key, np_img in images.items():
            self._update_image_widget(key, np_img)

    def schedule_start(self):
        """ 
        @brief Schedule video to start at specified time

        @details Parses time string and starts checking loop to begin
        video playback at the specified time.
        """ 

        import datetime
        target_str = self.start_time_var.get().strip()
        try:
            target_time = datetime.datetime.strptime(target_str, "%H:%M:%S").time()
            print(f"[Timer] Scheduled video start at {target_time}.")
            self._check_time_loop(target_time)
        except ValueError:
            print("[Timer] Invalid time format. Please use HH:MM:SS (e.g., 14:30:00).")

    def _check_time_loop(self, target_time):
        """ 
        @brief Internal method to check if target time has been reached

        @param target_time Target time to start video
        """ 

        import datetime
        now = datetime.datetime.now().time()

        if now >= target_time:
            print("[Timer] Target time reached — starting video.")
            self.start_video()
            return

        self.root.after(1000, lambda: self._check_time_loop(target_time))

if __name__ == "__main__":
    """ 
    @brief Main entry point for the application

    @details Creates Tkinter root window and starts the GUI application
    """ 
    root = tk.Tk()
    app = ImageDisplayApp(root)
    root.mainloop()

altmain.py

""" 
@file altmain.py
@brief Projection Splitter with Overlap Blending using PyTorch.

@details
This module provides a projection image processing system designed for 
multi-projector setups or panoramic displays. It takes an input image, 
splits it into two halves (left and right), and applies an overlap blending 
region between them to ensure seamless projection alignment.

It supports multiple blending modes (linear, quadratic, and Gaussian) 
and performs GPU-accelerated computation via PyTorch for high efficiency.
""" 

import cv2
import numpy as np
import math
from scipy.special import erf
import torch
import config_reader

class ProjectionSplit:
    """ 
    @brief Handles image splitting and blending for projection alignment.

    @details
    The ProjectionSplit class processes both static images and individual 
    frames from video sources. It divides the input image into two parts 
    with a configurable overlap and applies smooth blending transitions 
    using selected mathematical models.

    The blending operations are optimized with PyTorch and support the 
    Metal backend for GPU acceleration on macOS devices.
    """ 

    def __init__(self):
        """@brief Initialize the ProjectionSplit object and configuration.

        @details
        This constructor initializes placeholders for the left, right, 
        and main images, and loads configuration parameters (e.g., 
        blending coefficients) from the `config.ini` file via ConfigReader.
        """ 
        self.image_left = None
        self.image_right = None
        self.image_main = None
        self.cfg = config_reader.ConfigReader("config.ini")

    def process_frame(self, image, overlap: int = 75, blend_type: str = "exponential"):
        """@brief Process a single input frame into left and right projections with overlap blending.

        @details
        This method divides an input image frame into two halves and applies 
        a blending function to the overlapping region between them. The 
        blending type determines the transition smoothness between projectors.

        Available blend types:
        - **linear**: Simple linear transition.
        - **quadratic**: Smoother parabolic blending.
        - **gaussian**: Natural soft transition curve based on Gaussian distribution.

        The blending is executed on GPU via PyTorch for efficient computation.

        @param image The input image (NumPy array) in BGR or BGRA format.
        @param overlap Integer specifying the pixel width of the overlapping area. Default: 75.
        @param blend_type String specifying the blending function ("linear", "quadratic", "gaussian").
        @throws FileNotFoundError If the image is None or not found.
        @throws ValueError If an invalid blend_type is specified.
        """ 
        if image is None:
            raise FileNotFoundError("Error: input.png not found or could not be loaded.")
        self.image_main = image

        # Ensure alpha channel
        if image.shape[2] == 3:
            image = cv2.cvtColor(image, cv2.COLOR_BGR2BGRA)

        height, width = image.shape[:2]
        split_x = width // 2

        # Define overlapping regions
        left_end = split_x + overlap // 2
        right_start = split_x - overlap // 2

        left_img = image[:, :left_end].copy()
        right_img = image[:, right_start:].copy()

        # Generate normalized overlap vector
        x = np.linspace(0, 1, overlap)

        # Select blending function
        if blend_type == "linear":
            alpha_left_curve = 1 - self.cfg.get_linear_parameter() * x
            alpha_right_curve = 1 - self.cfg.get_linear_parameter() + self.cfg.get_linear_parameter() * x
        elif blend_type == "quadratic":
            alpha_left_curve = (1 - x) ** 2
            alpha_right_curve = x ** 2
        elif blend_type == "gaussian":
            sigma = 0.25
            g = 0.5 * (1 + erf((x - 0.5) / (sigma * np.sqrt(2))))
            alpha_left_curve = 1 - g
            alpha_right_curve = g
        else:
            raise ValueError(f"Unknown blend_type '{blend_type}'")

        # GPU accelerated blending using PyTorch
        device = "mps"  # Metal backend (for macOS)
        left_img_t = torch.from_numpy(left_img).to(device, dtype=torch.float32)
        right_img_t = torch.from_numpy(right_img).to(device, dtype=torch.float32)
        alpha_left_t = torch.from_numpy(alpha_left_curve).to(device, dtype=torch.float32)
        alpha_right_t = torch.from_numpy(alpha_right_curve).to(device, dtype=torch.float32)

        # Expand alpha for broadcast along image width
        alpha_left_2d = alpha_left_t.unsqueeze(0).unsqueeze(-1)
        alpha_right_2d = alpha_right_t.unsqueeze(0).unsqueeze(-1)

        # Apply blending on alpha channel
        left_img_t[:, -overlap:, 3] *= alpha_left_2d.squeeze(-1)
        right_img_t[:, :overlap, 3] *= alpha_right_2d.squeeze(-1)

        # Convert back to CPU for saving
        left_img = left_img_t.cpu().numpy().astype(np.uint8)
        right_img = right_img_t.cpu().numpy().astype(np.uint8)

        self.image_left = left_img
        self.image_right = right_img

        cv2.imwrite("left.png", left_img)
        cv2.imwrite("right.png", right_img)

    def process_images(self, overlap: int = 75, blend_type: str = "exponential"):
        """@brief Process a static image file and generate blended left/right outputs.

        @details
        Reads 'input.png' from the current working directory, applies 
        image splitting and overlap blending, and saves the processed 
        halves as 'left.png' and 'right.png'.

        This function uses the same internal logic as process_frame() 
        but is intended for static image files instead of real-time frames.

        @param overlap Integer pixel width of the overlapping region. Default: 75.
        @param blend_type String specifying blending mode ("linear", "quadratic", "gaussian").
        @throws FileNotFoundError If 'input.png' is not found.
        @throws ValueError If an invalid blending type is selected.
        """ 
        image = cv2.imread("input.png", cv2.IMREAD_UNCHANGED)
        self.image_main = image
        if image is None:
            raise FileNotFoundError("Error: input.png not found.")

        # Ensure image has alpha channel
        if image.shape[2] == 3:
            image = cv2.cvtColor(image, cv2.COLOR_BGR2BGRA)

        height, width = image.shape[:2]
        split_x = width // 2

        left_end = split_x + overlap // 2
        right_start = split_x - overlap // 2

        left_img = image[:, :left_end].copy()
        right_img = image[:, right_start:].copy()

        # Create blend curve
        x = np.linspace(0, 1, overlap)
        if blend_type == "linear":
            alpha_left_curve = 1 - self.cfg.get_linear_parameter() * x
            alpha_right_curve = 1 - self.cfg.get_linear_parameter() + self.cfg.get_linear_parameter() * x
        elif blend_type == "quadratic":
            alpha_left_curve = (1 - x) ** 2
            alpha_right_curve = x ** 2
        elif blend_type == "gaussian":
            sigma = 0.25
            g = 0.5 * (1 + erf((x - 0.5) / (sigma * np.sqrt(2))))
            alpha_left_curve = 1 - g
            alpha_right_curve = g
        else:
            raise ValueError(f"Unknown blend_type '{blend_type}'")

        # GPU blending with PyTorch
        device = "mps" 
        left_img_t = torch.from_numpy(left_img).to(device, dtype=torch.float32)
        right_img_t = torch.from_numpy(right_img).to(device, dtype=torch.float32)
        alpha_left_t = torch.from_numpy(alpha_left_curve).to(device, dtype=torch.float32)
        alpha_right_t = torch.from_numpy(alpha_right_curve).to(device, dtype=torch.float32)

        alpha_left_2d = alpha_left_t.unsqueeze(0).unsqueeze(-1)
        alpha_right_2d = alpha_right_t.unsqueeze(0).unsqueeze(-1)

        left_img_t[:, -overlap:, 3] *= alpha_left_2d.squeeze(-1)
        right_img_t[:, :overlap, 3] *= alpha_right_2d.squeeze(-1)

        left_img = left_img_t.cpu().numpy().astype(np.uint8)
        right_img = right_img_t.cpu().numpy().astype(np.uint8)

        self.image_left = left_img
        self.image_right = right_img

        cv2.imwrite("left.png", left_img)
        cv2.imwrite("right.png", right_img)

video_processing.py

import cv2

class VideoProcessing:
    """@brief A class for loading, reading, and managing video streams using OpenCV.

    @details
    This class provides basic functionality for working with video files:
    - Loading a video from a file path.
    - Reading frames sequentially.
    - Releasing system resources after use.

    It is designed for modular integration with image-processing workflows, 
    enabling real-time or frame-by-frame analysis from video inputs.
    """ 

    def __init__(self, video_path=None):
        """@brief Initialize the VideoProcessing object.

        @details
        Optionally takes a video file path at initialization. If provided, 
        the constructor automatically loads the video for reading.

        @param video_path (optional) The file path to the video file to load.
        """ 
        self.cap = None
        if video_path:
            self.load_video(video_path)

    def load_video(self, video_path):
        """@brief Load a video file for processing.

        @details
        Opens a video file using OpenCV’s `VideoCapture`. If another video is 
        already loaded, it is safely released before opening the new one.

        @param video_path The file path to the video file to load.

        @throws ValueError If the video cannot be opened successfully.
        """ 
        if self.cap is not None:
            self.cap.release()

        self.cap = cv2.VideoCapture(video_path)

        if not self.cap.isOpened():
            raise ValueError(f"Cannot open video file: {video_path}")

    def get_next_frame(self):
        """@brief Retrieve the next frame from the video stream.

        @details
        Reads the next available frame from the video. If the end of the video
        is reached or the video is not properly loaded, returns `None`.

        @return The next video frame as a NumPy array, or `None` if no frame is available.

        @throws RuntimeError If no video is loaded before calling this function.
        """ 
        if self.cap is None:
            raise RuntimeError("No video loaded. Call load_video() first.")

        ret, frame = self.cap.read()
        if not ret:
            return None  # End of video or error

        return frame

    def release(self):
        """@brief Release the video capture resource.

        @details
        Safely releases the OpenCV `VideoCapture` object to free up system resources.
        This should always be called when video processing is complete.
        """ 
        if self.cap is not None:
            self.cap.release()
            self.cap = None

Updated by Sandeep GANESAN about 18 hours ago · 3 revisions