Module torpido.io

This file contains function to separate out video and audio using ffmpeg. It consists of two functions to split and merge video and audio using ffmpeg.

Expand source code
"""
This file contains function to separate out video and audio using ffmpeg.
It consists of two functions to split and merge video and audio
using ffmpeg.
"""

import gc

from torpido.config import *
from torpido.exceptions import AudioStreamMissingException, FFmpegProcessException
from torpido.progress import Progress
from torpido.tools import split, merge


class FFMPEG:
    """
    FFMPEG helper function to make use of the subprocess module to run command
    to trim and split video files. The std out logs are parsed to generate a progress
    bar to show the progress of the command that is being executed.

    Attributes
    ----------
    __inputFileName : str
        input video file name
    __inputFilePath : str
        input video file name
    __outputVideoFileName : str
        output video file name
    __inputAudioFileName : str
        original splatted audio file name
    __outputAudioFileName : str
        de-noised audio file name
    __outputFilePath : str
        same as the input file path
    __progressBar : tqdm
        object of tqdm to display a progress bar
    """
    def __init__(self):
        self.__inputFileName = None
        self.__inputFilePath = None
        self.__outputVideoFileName = None
        self.__inputAudioFileName = None
        self.__outputAudioFileName = None
        self.__outputFilePath = None
        self.__progressBar = None

    def getInputFileNamePath(self):
        """
        Returns file name that was used for processing

        Returns
        --------
        str
            final name and path of the input video file
        """
        if self.__inputFileName is not None:
            return os.path.join(self.__inputFilePath, self.__inputFileName)
        return None

    def getOutputFileNamePath(self):
        """
        Returns output file name generated from input
        file name

        Returns
        -------
        str
            output file name and path of the video file
        """
        if self.__outputVideoFileName is not None:
            return os.path.join(self.__outputFilePath, self.__outputVideoFileName)
        return None

    def getInputAudioFileNamePath(self):
        """
        Returns name and path of the input audio file that is split from the input video file

        Returns
        -------
        str
            input audio file name and path ready to de-noise
        """
        if self.__inputAudioFileName is not None:
            return os.path.join(self.__outputFilePath, self.__inputAudioFileName)
        return None

    def getOutputAudioFileNamePath(self):
        """
        Returns the output audio file name and path that is de-noised

        Returns
        -------
        str
            returns the output audio file
        """
        if self.__outputAudioFileName is not None:
            return os.path.join(self.__outputFilePath, self.__outputAudioFileName)
        return None

    def splitVideoAudio(self, inputFile):
        """
        Function to split the input video file into audio file using FFmpeg. Progress bar is
        updated as the command is run

        Note : No new video file is created, only audio file is created

        Parameters
        ----------
        inputFile : str
            input video file

        Returns
        -------
        bool
            returns True if success else Error

        """
        if os.path.isfile(inputFile) is False:
            Log.e("File does not exists")
            return False

        # storing all the references
        self.__inputFilePath = os.path.dirname(inputFile)
        self.__outputFilePath = self.__inputFilePath
        self.__inputFileName = os.path.basename(inputFile)
        self.__outputVideoFileName = os.path.splitext(self.__inputFileName)[0] + OUT_VIDEO_FILE
        self.__inputAudioFileName = os.path.splitext(self.__inputFileName)[0] + IN_AUDIO_FILE
        self.__outputAudioFileName = os.path.splitext(self.__inputFileName)[0] + OUT_AUDIO_FILE

        # call ffmpeg tool to do the splitting
        try:
            Log.i("Splitting the video file.")
            self.__progressBar = Progress()
            for log in split(inputFile,
                             os.path.join(self.__outputFilePath, self.__inputAudioFileName)):
                self.__progressBar.displayProgress(log)

            if not os.path.isfile(os.path.join(self.__outputFilePath, self.__inputAudioFileName)):
                raise AudioStreamMissingException

            self.__progressBar.complete()
            print("----------------------------------------------------------")
            return True

        # no audio in the video
        except AudioStreamMissingException:
            Log.e(AudioStreamMissingException.cause)
            self.__progressBar.clear()
            return False

    def mergeAudioVideo(self, timestamps):
        """
        Function to merge the processed files using FFmpeg. The timestamps are used the trim
        the original video file and the audio stream is replaced with the de-noised audio
        file created by `Auditory`

        Parameters
        ----------
        timestamps : list
            list of start and end timestamps

        Returns
        -------
        bool
            True id success else error is raised
        """
        if self.__inputFileName is None or self.__outputAudioFileName is None:
            Log.e("Files not found for merging")
            return False

        # call ffmpeg tool to merge the files
        try:
            self.__progressBar = Progress()
            Log.i("Writing the output video file.")
            for log in merge(os.path.join(self.__outputFilePath, self.__inputFileName),
                             os.path.join(self.__outputFilePath, self.__outputAudioFileName),
                             os.path.join(self.__outputFilePath, self.__outputVideoFileName),
                             timestamps):
                self.__progressBar.displayProgress(log)

            if not os.path.isfile(os.path.join(self.__outputFilePath, self.__outputVideoFileName)):
                raise FFmpegProcessException

            self.__progressBar.complete()
            print("----------------------------------------------------------")
            return True

        except FFmpegProcessException:
            Log.e(FFmpegProcessException.cause)
            self.__progressBar.clear()
            return False

    def cleanUp(self):
        """
        Deletes extra files created while processing, deletes the ranking files
        cache, etc.
        """
        # well something is wrong here then bye
        if self.__inputFileName is None:
            return

        # original audio split from the video file
        if os.path.isfile(os.path.join(self.__outputFilePath, self.__inputAudioFileName)):
            os.unlink(os.path.join(self.__outputFilePath, self.__inputAudioFileName))

        # de-noised audio file output of Auditory
        if os.path.isfile(os.path.join(self.__outputFilePath, self.__outputAudioFileName)):
            os.unlink(os.path.join(self.__outputFilePath, self.__outputAudioFileName))

        # motion ranking file
        if os.path.isfile(os.path.join(MODEL_DIR, RANK_OUT_MOTION)):
            os.unlink(os.path.join(MODEL_DIR, RANK_OUT_MOTION))

        # blur ranking file
        if os.path.isfile(os.path.join(MODEL_DIR, RANK_OUT_BLUR)):
            os.unlink(os.path.join(MODEL_DIR, RANK_OUT_BLUR))

        # audio ranking file
        if os.path.isfile(os.path.join(MODEL_DIR, RANK_OUT_AUDIO)):
            os.unlink(os.path.join(MODEL_DIR, RANK_OUT_AUDIO))

        # text ranking file
        if os.path.isfile(os.path.join(MODEL_DIR, RANK_OUT_TEXT)):
            os.unlink(os.path.join(MODEL_DIR, RANK_OUT_TEXT))

        # cache storage file
        if os.path.isfile(os.path.join(CACHE_DIR, CACHE_NAME)):
            os.unlink(os.path.join(CACHE_DIR, CACHE_NAME))

        # removing the model directory
        if os.path.isdir(MODEL_DIR):
            os.rmdir(MODEL_DIR)

        del self.__progressBar
        Log.d(f"Garbage collected :: {gc.collect()}")
        Log.d("Clean up completed.")

Classes

class FFMPEG

FFMPEG helper function to make use of the subprocess module to run command to trim and split video files. The std out logs are parsed to generate a progress bar to show the progress of the command that is being executed.

Attributes

__inputFileName : str
input video file name
__inputFilePath : str
input video file name
__outputVideoFileName : str
output video file name
__inputAudioFileName : str
original splatted audio file name
__outputAudioFileName : str
de-noised audio file name
__outputFilePath : str
same as the input file path
__progressBar : tqdm
object of tqdm to display a progress bar
Expand source code
class FFMPEG:
    """
    FFMPEG helper function to make use of the subprocess module to run command
    to trim and split video files. The std out logs are parsed to generate a progress
    bar to show the progress of the command that is being executed.

    Attributes
    ----------
    __inputFileName : str
        input video file name
    __inputFilePath : str
        input video file name
    __outputVideoFileName : str
        output video file name
    __inputAudioFileName : str
        original splatted audio file name
    __outputAudioFileName : str
        de-noised audio file name
    __outputFilePath : str
        same as the input file path
    __progressBar : tqdm
        object of tqdm to display a progress bar
    """
    def __init__(self):
        self.__inputFileName = None
        self.__inputFilePath = None
        self.__outputVideoFileName = None
        self.__inputAudioFileName = None
        self.__outputAudioFileName = None
        self.__outputFilePath = None
        self.__progressBar = None

    def getInputFileNamePath(self):
        """
        Returns file name that was used for processing

        Returns
        --------
        str
            final name and path of the input video file
        """
        if self.__inputFileName is not None:
            return os.path.join(self.__inputFilePath, self.__inputFileName)
        return None

    def getOutputFileNamePath(self):
        """
        Returns output file name generated from input
        file name

        Returns
        -------
        str
            output file name and path of the video file
        """
        if self.__outputVideoFileName is not None:
            return os.path.join(self.__outputFilePath, self.__outputVideoFileName)
        return None

    def getInputAudioFileNamePath(self):
        """
        Returns name and path of the input audio file that is split from the input video file

        Returns
        -------
        str
            input audio file name and path ready to de-noise
        """
        if self.__inputAudioFileName is not None:
            return os.path.join(self.__outputFilePath, self.__inputAudioFileName)
        return None

    def getOutputAudioFileNamePath(self):
        """
        Returns the output audio file name and path that is de-noised

        Returns
        -------
        str
            returns the output audio file
        """
        if self.__outputAudioFileName is not None:
            return os.path.join(self.__outputFilePath, self.__outputAudioFileName)
        return None

    def splitVideoAudio(self, inputFile):
        """
        Function to split the input video file into audio file using FFmpeg. Progress bar is
        updated as the command is run

        Note : No new video file is created, only audio file is created

        Parameters
        ----------
        inputFile : str
            input video file

        Returns
        -------
        bool
            returns True if success else Error

        """
        if os.path.isfile(inputFile) is False:
            Log.e("File does not exists")
            return False

        # storing all the references
        self.__inputFilePath = os.path.dirname(inputFile)
        self.__outputFilePath = self.__inputFilePath
        self.__inputFileName = os.path.basename(inputFile)
        self.__outputVideoFileName = os.path.splitext(self.__inputFileName)[0] + OUT_VIDEO_FILE
        self.__inputAudioFileName = os.path.splitext(self.__inputFileName)[0] + IN_AUDIO_FILE
        self.__outputAudioFileName = os.path.splitext(self.__inputFileName)[0] + OUT_AUDIO_FILE

        # call ffmpeg tool to do the splitting
        try:
            Log.i("Splitting the video file.")
            self.__progressBar = Progress()
            for log in split(inputFile,
                             os.path.join(self.__outputFilePath, self.__inputAudioFileName)):
                self.__progressBar.displayProgress(log)

            if not os.path.isfile(os.path.join(self.__outputFilePath, self.__inputAudioFileName)):
                raise AudioStreamMissingException

            self.__progressBar.complete()
            print("----------------------------------------------------------")
            return True

        # no audio in the video
        except AudioStreamMissingException:
            Log.e(AudioStreamMissingException.cause)
            self.__progressBar.clear()
            return False

    def mergeAudioVideo(self, timestamps):
        """
        Function to merge the processed files using FFmpeg. The timestamps are used the trim
        the original video file and the audio stream is replaced with the de-noised audio
        file created by `Auditory`

        Parameters
        ----------
        timestamps : list
            list of start and end timestamps

        Returns
        -------
        bool
            True id success else error is raised
        """
        if self.__inputFileName is None or self.__outputAudioFileName is None:
            Log.e("Files not found for merging")
            return False

        # call ffmpeg tool to merge the files
        try:
            self.__progressBar = Progress()
            Log.i("Writing the output video file.")
            for log in merge(os.path.join(self.__outputFilePath, self.__inputFileName),
                             os.path.join(self.__outputFilePath, self.__outputAudioFileName),
                             os.path.join(self.__outputFilePath, self.__outputVideoFileName),
                             timestamps):
                self.__progressBar.displayProgress(log)

            if not os.path.isfile(os.path.join(self.__outputFilePath, self.__outputVideoFileName)):
                raise FFmpegProcessException

            self.__progressBar.complete()
            print("----------------------------------------------------------")
            return True

        except FFmpegProcessException:
            Log.e(FFmpegProcessException.cause)
            self.__progressBar.clear()
            return False

    def cleanUp(self):
        """
        Deletes extra files created while processing, deletes the ranking files
        cache, etc.
        """
        # well something is wrong here then bye
        if self.__inputFileName is None:
            return

        # original audio split from the video file
        if os.path.isfile(os.path.join(self.__outputFilePath, self.__inputAudioFileName)):
            os.unlink(os.path.join(self.__outputFilePath, self.__inputAudioFileName))

        # de-noised audio file output of Auditory
        if os.path.isfile(os.path.join(self.__outputFilePath, self.__outputAudioFileName)):
            os.unlink(os.path.join(self.__outputFilePath, self.__outputAudioFileName))

        # motion ranking file
        if os.path.isfile(os.path.join(MODEL_DIR, RANK_OUT_MOTION)):
            os.unlink(os.path.join(MODEL_DIR, RANK_OUT_MOTION))

        # blur ranking file
        if os.path.isfile(os.path.join(MODEL_DIR, RANK_OUT_BLUR)):
            os.unlink(os.path.join(MODEL_DIR, RANK_OUT_BLUR))

        # audio ranking file
        if os.path.isfile(os.path.join(MODEL_DIR, RANK_OUT_AUDIO)):
            os.unlink(os.path.join(MODEL_DIR, RANK_OUT_AUDIO))

        # text ranking file
        if os.path.isfile(os.path.join(MODEL_DIR, RANK_OUT_TEXT)):
            os.unlink(os.path.join(MODEL_DIR, RANK_OUT_TEXT))

        # cache storage file
        if os.path.isfile(os.path.join(CACHE_DIR, CACHE_NAME)):
            os.unlink(os.path.join(CACHE_DIR, CACHE_NAME))

        # removing the model directory
        if os.path.isdir(MODEL_DIR):
            os.rmdir(MODEL_DIR)

        del self.__progressBar
        Log.d(f"Garbage collected :: {gc.collect()}")
        Log.d("Clean up completed.")

Methods

def cleanUp(self)

Deletes extra files created while processing, deletes the ranking files cache, etc.

Expand source code
def cleanUp(self):
    """
    Deletes extra files created while processing, deletes the ranking files
    cache, etc.
    """
    # well something is wrong here then bye
    if self.__inputFileName is None:
        return

    # original audio split from the video file
    if os.path.isfile(os.path.join(self.__outputFilePath, self.__inputAudioFileName)):
        os.unlink(os.path.join(self.__outputFilePath, self.__inputAudioFileName))

    # de-noised audio file output of Auditory
    if os.path.isfile(os.path.join(self.__outputFilePath, self.__outputAudioFileName)):
        os.unlink(os.path.join(self.__outputFilePath, self.__outputAudioFileName))

    # motion ranking file
    if os.path.isfile(os.path.join(MODEL_DIR, RANK_OUT_MOTION)):
        os.unlink(os.path.join(MODEL_DIR, RANK_OUT_MOTION))

    # blur ranking file
    if os.path.isfile(os.path.join(MODEL_DIR, RANK_OUT_BLUR)):
        os.unlink(os.path.join(MODEL_DIR, RANK_OUT_BLUR))

    # audio ranking file
    if os.path.isfile(os.path.join(MODEL_DIR, RANK_OUT_AUDIO)):
        os.unlink(os.path.join(MODEL_DIR, RANK_OUT_AUDIO))

    # text ranking file
    if os.path.isfile(os.path.join(MODEL_DIR, RANK_OUT_TEXT)):
        os.unlink(os.path.join(MODEL_DIR, RANK_OUT_TEXT))

    # cache storage file
    if os.path.isfile(os.path.join(CACHE_DIR, CACHE_NAME)):
        os.unlink(os.path.join(CACHE_DIR, CACHE_NAME))

    # removing the model directory
    if os.path.isdir(MODEL_DIR):
        os.rmdir(MODEL_DIR)

    del self.__progressBar
    Log.d(f"Garbage collected :: {gc.collect()}")
    Log.d("Clean up completed.")
def getInputAudioFileNamePath(self)

Returns name and path of the input audio file that is split from the input video file

Returns

str
input audio file name and path ready to de-noise
Expand source code
def getInputAudioFileNamePath(self):
    """
    Returns name and path of the input audio file that is split from the input video file

    Returns
    -------
    str
        input audio file name and path ready to de-noise
    """
    if self.__inputAudioFileName is not None:
        return os.path.join(self.__outputFilePath, self.__inputAudioFileName)
    return None
def getInputFileNamePath(self)

Returns file name that was used for processing

Returns

str
final name and path of the input video file
Expand source code
def getInputFileNamePath(self):
    """
    Returns file name that was used for processing

    Returns
    --------
    str
        final name and path of the input video file
    """
    if self.__inputFileName is not None:
        return os.path.join(self.__inputFilePath, self.__inputFileName)
    return None
def getOutputAudioFileNamePath(self)

Returns the output audio file name and path that is de-noised

Returns

str
returns the output audio file
Expand source code
def getOutputAudioFileNamePath(self):
    """
    Returns the output audio file name and path that is de-noised

    Returns
    -------
    str
        returns the output audio file
    """
    if self.__outputAudioFileName is not None:
        return os.path.join(self.__outputFilePath, self.__outputAudioFileName)
    return None
def getOutputFileNamePath(self)

Returns output file name generated from input file name

Returns

str
output file name and path of the video file
Expand source code
def getOutputFileNamePath(self):
    """
    Returns output file name generated from input
    file name

    Returns
    -------
    str
        output file name and path of the video file
    """
    if self.__outputVideoFileName is not None:
        return os.path.join(self.__outputFilePath, self.__outputVideoFileName)
    return None
def mergeAudioVideo(self, timestamps)

Function to merge the processed files using FFmpeg. The timestamps are used the trim the original video file and the audio stream is replaced with the de-noised audio file created by Auditory

Parameters

timestamps : list
list of start and end timestamps

Returns

bool
True id success else error is raised
Expand source code
def mergeAudioVideo(self, timestamps):
    """
    Function to merge the processed files using FFmpeg. The timestamps are used the trim
    the original video file and the audio stream is replaced with the de-noised audio
    file created by `Auditory`

    Parameters
    ----------
    timestamps : list
        list of start and end timestamps

    Returns
    -------
    bool
        True id success else error is raised
    """
    if self.__inputFileName is None or self.__outputAudioFileName is None:
        Log.e("Files not found for merging")
        return False

    # call ffmpeg tool to merge the files
    try:
        self.__progressBar = Progress()
        Log.i("Writing the output video file.")
        for log in merge(os.path.join(self.__outputFilePath, self.__inputFileName),
                         os.path.join(self.__outputFilePath, self.__outputAudioFileName),
                         os.path.join(self.__outputFilePath, self.__outputVideoFileName),
                         timestamps):
            self.__progressBar.displayProgress(log)

        if not os.path.isfile(os.path.join(self.__outputFilePath, self.__outputVideoFileName)):
            raise FFmpegProcessException

        self.__progressBar.complete()
        print("----------------------------------------------------------")
        return True

    except FFmpegProcessException:
        Log.e(FFmpegProcessException.cause)
        self.__progressBar.clear()
        return False
def splitVideoAudio(self, inputFile)

Function to split the input video file into audio file using FFmpeg. Progress bar is updated as the command is run

Note : No new video file is created, only audio file is created

Parameters

inputFile : str
input video file

Returns

bool
returns True if success else Error
Expand source code
def splitVideoAudio(self, inputFile):
    """
    Function to split the input video file into audio file using FFmpeg. Progress bar is
    updated as the command is run

    Note : No new video file is created, only audio file is created

    Parameters
    ----------
    inputFile : str
        input video file

    Returns
    -------
    bool
        returns True if success else Error

    """
    if os.path.isfile(inputFile) is False:
        Log.e("File does not exists")
        return False

    # storing all the references
    self.__inputFilePath = os.path.dirname(inputFile)
    self.__outputFilePath = self.__inputFilePath
    self.__inputFileName = os.path.basename(inputFile)
    self.__outputVideoFileName = os.path.splitext(self.__inputFileName)[0] + OUT_VIDEO_FILE
    self.__inputAudioFileName = os.path.splitext(self.__inputFileName)[0] + IN_AUDIO_FILE
    self.__outputAudioFileName = os.path.splitext(self.__inputFileName)[0] + OUT_AUDIO_FILE

    # call ffmpeg tool to do the splitting
    try:
        Log.i("Splitting the video file.")
        self.__progressBar = Progress()
        for log in split(inputFile,
                         os.path.join(self.__outputFilePath, self.__inputAudioFileName)):
            self.__progressBar.displayProgress(log)

        if not os.path.isfile(os.path.join(self.__outputFilePath, self.__inputAudioFileName)):
            raise AudioStreamMissingException

        self.__progressBar.complete()
        print("----------------------------------------------------------")
        return True

    # no audio in the video
    except AudioStreamMissingException:
        Log.e(AudioStreamMissingException.cause)
        self.__progressBar.clear()
        return False