Module torpido.textual
This file is for calculating the textual ranking for the video ,the textual ranking is done by detecting text in the video. Text detection is achieved by using EAST Text Detection model of OpenCV, this model can detect text and return confidences and geometry for the sections that contain the text. If mixed with text extraction it can give text from the image.
Expand source code
"""
This file is for calculating the textual ranking for the video
,the textual ranking is done by detecting text in the video.
Text detection is achieved by using EAST Text Detection model
of OpenCV, this model can detect text and return confidences and
geometry for the sections that contain the text.
If mixed with text extraction it can give text from the image.
"""
import time
import cv2
import numpy as np
from torpido.config import *
from torpido.exceptions import EastModelEnvironmentMissing
from torpido.util import image
from torpido.video import VideoGet
class Textual:
"""
Class to perform Textual analysis on the input video file. This class creates its own
video reader and handles the frame independent of the `Visual`. The EAST model of the
OpenCV is used to detect text in the video.
Since, the model is very slow depend-ing on the system its running. So some of the frames
are skipped `TEXT_SKIP_FRAMES` determines the no of frames to skip
Attributes
----------
__fps : float
video fps
__frameCount : int
number of frames in the video
__textRanks : list
list of the ranks
__videoGetter : VideoGet
object of the video get to read the video through thread
__minConfidence : int
minimum confidence to determine if the video contains text
__WIDTH : int, default=320
the east model requires the frame to be size of multiple of 32x32
__HEIGHT : int, default=320
height of the frame
__skipFrames : int
no of frames to skip
__textRankPath : str
constants file defines where to store the ranks
__net : object
loaded east model
__textDetectLayerName
layer name to detect the text in the video and return the code
__textDisplayLayerNames
layers to detect and return the coordinates of the boxes of text detected
"""
def __init__(self):
cv2.setUseOptimized(True)
self.__fps = None
self.__frameCount = None
self.__textRanks = None
self.__videoGetter = None
self.__minConfidence = TEXT_MIN_CONFIDENCE
self.__WIDTH = 320 # this val should be multiple of 32
self.__HEIGHT = 320 # same thing for this
self.__skipFrames = TEXT_SKIP_FRAMES
self.__textRankPath = os.path.join(os.getcwd(), RANK_DIR, RANK_OUT_TEXT)
# initializing the model
# reading the model in the memory
if TEXT_EAST_MODEL_PATH is not None:
self.__net = cv2.dnn.readNet(TEXT_EAST_MODEL_PATH)
else:
raise EastModelEnvironmentMissing
# adding output layer to only return confidence for text
self.__textDetectLayerName = ["feature_fusion/Conv_7/Sigmoid"]
# adding output layers to the model with text detected boxes
self.__textDisplayLayerNames = ["feature_fusion/Conv_7/Sigmoid",
"feature_fusion/concat_3"]
def startProcessing(self, inputFile, display=False):
"""
Function to perform the Textual Processing on the input video file.
The video can be displayed as the processing is going on.
Parameters
----------
inputFile : str
input video file
display : bool
True to display the video while processing
"""
if os.path.isfile(inputFile) is False:
Log.e(f"File {inputFile} does not exists")
return
self.__videoGetter = VideoGet(str(inputFile)).start()
myClip = self.__videoGetter.stream
if self.__videoGetter.getQueueSize() == 0:
time.sleep(0.5)
Log.d("Waiting for the buffer to fill up.")
self.__fps = myClip.get(cv2.CAP_PROP_FPS)
self.__frameCount = myClip.get(cv2.CAP_PROP_FRAME_COUNT)
self.__skipFrames = int(self.__fps * self.__skipFrames)
# maintaining the ranks for text detection
count = 0
self.__textRanks = []
while self.__videoGetter.more():
frame = self.__videoGetter.read()
if frame is None:
break
# resizing the frame to a multiple of 32 x 32
# resizing the frame
original = frame
(H, W) = frame.shape[:2]
rW = W / float(self.__WIDTH)
rH = H / float(self.__HEIGHT)
frame = cv2.resize(frame, (W, H))
count += 1
if count % self.__skipFrames == 0:
# making the image blob
blob = cv2.dnn.blobFromImage(frame,
1.0,
(self.__WIDTH, self.__HEIGHT),
(123.68, 116.78, 103.94),
swapRB=True, crop=False)
# run text detection
if display:
detectedText = self.__runTextDetectDisplay(blob, (rW, rH), original)
else:
detectedText = self.__runTextDetect(blob)
# if text is detected
if detectedText:
self.__textRanks.extend([RANK_TEXT] * int(self.__skipFrames))
Log.d("Text detected.")
else:
self.__textRanks.extend([0] * int(self.__skipFrames))
Log.d("No text detected.")
# clearing the memory
myClip.release()
self.__videoGetter.stop()
cv2.destroyAllWindows()
# calling the normalization of ranking
self.__timedRankingNormalize()
def __runTextDetect(self, blob):
"""
Function to detect only text and no display. Gets the scores and calculates if the image
contains any text
Parameters
----------
blob : blob
blob of the image
Returns
-------
bool
True denotes text detected
"""
self.__net.setInput(blob)
scores = self.__net.forward(self.__textDetectLayerName)
numRows, numCols = np.asarray(scores).shape[3: 5]
confidences = []
# since image is 320x320 the output is 80x80 (scores)
for x in range(0, numRows):
scoreData = scores[0][0][0][x]
for y in range(0, numCols):
if scoreData[y] < self.__minConfidence:
continue
confidences.append(scoreData[y])
# if confidences contain some value
if len(confidences) > 0:
return True
return False
def __runTextDetectDisplay(self, blob, rSize, original):
"""
Function to detect text using layer for getting the rectangles
to display on the frame
Parameters
----------
blob : blob
blob of the image
rSize : tuple
real sizes of the images
original : image array
un-resized image to display
Returns
-------
bool
True denotes text detected
"""
# running the model
self.__net.setInput(blob=blob)
scores, geometry = self.__net.forward(self.__textDisplayLayerNames)
numRows, numCols = scores.shape[2:4]
rect = []
confidences = []
# since image is 320x320 the output is 80x80 (scores)
for y in range(0, numRows):
scoresData = scores[0, 0, y]
xData0 = geometry[0, 0, y]
xData1 = geometry[0, 1, y]
xData2 = geometry[0, 2, y]
xData3 = geometry[0, 3, y]
anglesData = geometry[0, 4, y]
for x in range(0, numCols):
# if our score does not have sufficient probability, ignore it
if scoresData[x] < self.__minConfidence:
continue
# compute the offset factor as our resulting feature maps will
# be 4x smaller than the input image
(offsetX, offsetY) = (x * 4, y * 4)
# extract the rotation angle for the prediction and then
# compute the sin and cosine
angle = anglesData[x]
cos = np.cos(angle)
sin = np.sin(angle)
# use the geometry volume to derive the width and height of
# the bounding box
h = xData0[x] + xData2[x]
w = xData1[x] + xData3[x]
# compute both the starting and ending (x, y)-coordinates for
# the text prediction bounding box
endX = int(offsetX + (cos * xData1[x]) + (sin * xData2[x]))
endY = int(offsetY - (sin * xData1[x]) + (cos * xData2[x]))
startX = int(endX - w)
startY = int(endY - h)
# add the bounding box coordinates and probability score to
# our respective lists
rect.append((startX, startY, endX, endY))
confidences.append(scoresData[x])
# compressing the boxes or rectangles
boxes = image.nonMaxSuppression(np.array(rect), probs=confidences)
rW, rH = rSize
for startX, startY, endX, endY in boxes:
startX = int(startX * rW)
startY = int(startY * rH)
endX = int(endX * rW)
endY = int(endY * rH)
# draw the bounding box on the image
cv2.rectangle(original, (startX, startY), (endX, endY), (0, 255, 0), 2)
cv2.imshow("Text Detection", original)
cv2.waitKey(1) & 0xFF
if len(confidences) > 0:
return True
return False
def __timedRankingNormalize(self):
"""
Since ranking is added to frames, since frames are duration * fps
and audio frame system is different since frame are duration * rate
so we need to generalize the ranking system
sol: ranking sec of the video and audio, for than taking mean of the
frames to generate rank for video.
since ranking is 0 or 1, the mean will be different and we get more versatile
results.
we will read the list and slice the video to get 1 sec of frames and get
mean/average as the rank for the 1 sec
"""
textNormalize = []
for i in range(0, int(self.__frameCount), int(self.__fps)):
if len(self.__textRanks) >= (i + int(self.__fps)):
textNormalize.append(np.mean(self.__textRanks[i: i + int(self.__fps)]))
else:
break
# saving all processed stuffs
dump(textNormalize, self.__textRankPath)
Log.d(f"Textual rank length {len(textNormalize)}")
Log.i("Textual ranking saved .............")
def __del__(self):
"""
clean ups
"""
del self.__net
del self.__videoGetter
Log.d("Cleaning up.")
Classes
class Textual
-
Class to perform Textual analysis on the input video file. This class creates its own video reader and handles the frame independent of the
Visual
. The EAST model of the OpenCV is used to detect text in the video.Since, the model is very slow depend-ing on the system its running. So some of the frames are skipped
TEXT_SKIP_FRAMES
determines the no of frames to skipAttributes
__fps
:float
- video fps
__frameCount
:int
- number of frames in the video
__textRanks
:list
- list of the ranks
__videoGetter
:VideoGet
- object of the video get to read the video through thread
__minConfidence
:int
- minimum confidence to determine if the video contains text
__WIDTH
:int
, default=320
- the east model requires the frame to be size of multiple of 32x32
__HEIGHT
:int
, default=320
- height of the frame
__skipFrames
:int
- no of frames to skip
__textRankPath
:str
- constants file defines where to store the ranks
__net
:object
- loaded east model
__textDetectLayerName
- layer name to detect the text in the video and return the code
__textDisplayLayerNames
- layers to detect and return the coordinates of the boxes of text detected
Expand source code
class Textual: """ Class to perform Textual analysis on the input video file. This class creates its own video reader and handles the frame independent of the `Visual`. The EAST model of the OpenCV is used to detect text in the video. Since, the model is very slow depend-ing on the system its running. So some of the frames are skipped `TEXT_SKIP_FRAMES` determines the no of frames to skip Attributes ---------- __fps : float video fps __frameCount : int number of frames in the video __textRanks : list list of the ranks __videoGetter : VideoGet object of the video get to read the video through thread __minConfidence : int minimum confidence to determine if the video contains text __WIDTH : int, default=320 the east model requires the frame to be size of multiple of 32x32 __HEIGHT : int, default=320 height of the frame __skipFrames : int no of frames to skip __textRankPath : str constants file defines where to store the ranks __net : object loaded east model __textDetectLayerName layer name to detect the text in the video and return the code __textDisplayLayerNames layers to detect and return the coordinates of the boxes of text detected """ def __init__(self): cv2.setUseOptimized(True) self.__fps = None self.__frameCount = None self.__textRanks = None self.__videoGetter = None self.__minConfidence = TEXT_MIN_CONFIDENCE self.__WIDTH = 320 # this val should be multiple of 32 self.__HEIGHT = 320 # same thing for this self.__skipFrames = TEXT_SKIP_FRAMES self.__textRankPath = os.path.join(os.getcwd(), RANK_DIR, RANK_OUT_TEXT) # initializing the model # reading the model in the memory if TEXT_EAST_MODEL_PATH is not None: self.__net = cv2.dnn.readNet(TEXT_EAST_MODEL_PATH) else: raise EastModelEnvironmentMissing # adding output layer to only return confidence for text self.__textDetectLayerName = ["feature_fusion/Conv_7/Sigmoid"] # adding output layers to the model with text detected boxes self.__textDisplayLayerNames = ["feature_fusion/Conv_7/Sigmoid", "feature_fusion/concat_3"] def startProcessing(self, inputFile, display=False): """ Function to perform the Textual Processing on the input video file. The video can be displayed as the processing is going on. Parameters ---------- inputFile : str input video file display : bool True to display the video while processing """ if os.path.isfile(inputFile) is False: Log.e(f"File {inputFile} does not exists") return self.__videoGetter = VideoGet(str(inputFile)).start() myClip = self.__videoGetter.stream if self.__videoGetter.getQueueSize() == 0: time.sleep(0.5) Log.d("Waiting for the buffer to fill up.") self.__fps = myClip.get(cv2.CAP_PROP_FPS) self.__frameCount = myClip.get(cv2.CAP_PROP_FRAME_COUNT) self.__skipFrames = int(self.__fps * self.__skipFrames) # maintaining the ranks for text detection count = 0 self.__textRanks = [] while self.__videoGetter.more(): frame = self.__videoGetter.read() if frame is None: break # resizing the frame to a multiple of 32 x 32 # resizing the frame original = frame (H, W) = frame.shape[:2] rW = W / float(self.__WIDTH) rH = H / float(self.__HEIGHT) frame = cv2.resize(frame, (W, H)) count += 1 if count % self.__skipFrames == 0: # making the image blob blob = cv2.dnn.blobFromImage(frame, 1.0, (self.__WIDTH, self.__HEIGHT), (123.68, 116.78, 103.94), swapRB=True, crop=False) # run text detection if display: detectedText = self.__runTextDetectDisplay(blob, (rW, rH), original) else: detectedText = self.__runTextDetect(blob) # if text is detected if detectedText: self.__textRanks.extend([RANK_TEXT] * int(self.__skipFrames)) Log.d("Text detected.") else: self.__textRanks.extend([0] * int(self.__skipFrames)) Log.d("No text detected.") # clearing the memory myClip.release() self.__videoGetter.stop() cv2.destroyAllWindows() # calling the normalization of ranking self.__timedRankingNormalize() def __runTextDetect(self, blob): """ Function to detect only text and no display. Gets the scores and calculates if the image contains any text Parameters ---------- blob : blob blob of the image Returns ------- bool True denotes text detected """ self.__net.setInput(blob) scores = self.__net.forward(self.__textDetectLayerName) numRows, numCols = np.asarray(scores).shape[3: 5] confidences = [] # since image is 320x320 the output is 80x80 (scores) for x in range(0, numRows): scoreData = scores[0][0][0][x] for y in range(0, numCols): if scoreData[y] < self.__minConfidence: continue confidences.append(scoreData[y]) # if confidences contain some value if len(confidences) > 0: return True return False def __runTextDetectDisplay(self, blob, rSize, original): """ Function to detect text using layer for getting the rectangles to display on the frame Parameters ---------- blob : blob blob of the image rSize : tuple real sizes of the images original : image array un-resized image to display Returns ------- bool True denotes text detected """ # running the model self.__net.setInput(blob=blob) scores, geometry = self.__net.forward(self.__textDisplayLayerNames) numRows, numCols = scores.shape[2:4] rect = [] confidences = [] # since image is 320x320 the output is 80x80 (scores) for y in range(0, numRows): scoresData = scores[0, 0, y] xData0 = geometry[0, 0, y] xData1 = geometry[0, 1, y] xData2 = geometry[0, 2, y] xData3 = geometry[0, 3, y] anglesData = geometry[0, 4, y] for x in range(0, numCols): # if our score does not have sufficient probability, ignore it if scoresData[x] < self.__minConfidence: continue # compute the offset factor as our resulting feature maps will # be 4x smaller than the input image (offsetX, offsetY) = (x * 4, y * 4) # extract the rotation angle for the prediction and then # compute the sin and cosine angle = anglesData[x] cos = np.cos(angle) sin = np.sin(angle) # use the geometry volume to derive the width and height of # the bounding box h = xData0[x] + xData2[x] w = xData1[x] + xData3[x] # compute both the starting and ending (x, y)-coordinates for # the text prediction bounding box endX = int(offsetX + (cos * xData1[x]) + (sin * xData2[x])) endY = int(offsetY - (sin * xData1[x]) + (cos * xData2[x])) startX = int(endX - w) startY = int(endY - h) # add the bounding box coordinates and probability score to # our respective lists rect.append((startX, startY, endX, endY)) confidences.append(scoresData[x]) # compressing the boxes or rectangles boxes = image.nonMaxSuppression(np.array(rect), probs=confidences) rW, rH = rSize for startX, startY, endX, endY in boxes: startX = int(startX * rW) startY = int(startY * rH) endX = int(endX * rW) endY = int(endY * rH) # draw the bounding box on the image cv2.rectangle(original, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.imshow("Text Detection", original) cv2.waitKey(1) & 0xFF if len(confidences) > 0: return True return False def __timedRankingNormalize(self): """ Since ranking is added to frames, since frames are duration * fps and audio frame system is different since frame are duration * rate so we need to generalize the ranking system sol: ranking sec of the video and audio, for than taking mean of the frames to generate rank for video. since ranking is 0 or 1, the mean will be different and we get more versatile results. we will read the list and slice the video to get 1 sec of frames and get mean/average as the rank for the 1 sec """ textNormalize = [] for i in range(0, int(self.__frameCount), int(self.__fps)): if len(self.__textRanks) >= (i + int(self.__fps)): textNormalize.append(np.mean(self.__textRanks[i: i + int(self.__fps)])) else: break # saving all processed stuffs dump(textNormalize, self.__textRankPath) Log.d(f"Textual rank length {len(textNormalize)}") Log.i("Textual ranking saved .............") def __del__(self): """ clean ups """ del self.__net del self.__videoGetter Log.d("Cleaning up.")
Methods
def startProcessing(self, inputFile, display=False)
-
Function to perform the Textual Processing on the input video file. The video can be displayed as the processing is going on.
Parameters
inputFile
:str
- input video file
display
:bool
- True to display the video while processing
Expand source code
def startProcessing(self, inputFile, display=False): """ Function to perform the Textual Processing on the input video file. The video can be displayed as the processing is going on. Parameters ---------- inputFile : str input video file display : bool True to display the video while processing """ if os.path.isfile(inputFile) is False: Log.e(f"File {inputFile} does not exists") return self.__videoGetter = VideoGet(str(inputFile)).start() myClip = self.__videoGetter.stream if self.__videoGetter.getQueueSize() == 0: time.sleep(0.5) Log.d("Waiting for the buffer to fill up.") self.__fps = myClip.get(cv2.CAP_PROP_FPS) self.__frameCount = myClip.get(cv2.CAP_PROP_FRAME_COUNT) self.__skipFrames = int(self.__fps * self.__skipFrames) # maintaining the ranks for text detection count = 0 self.__textRanks = [] while self.__videoGetter.more(): frame = self.__videoGetter.read() if frame is None: break # resizing the frame to a multiple of 32 x 32 # resizing the frame original = frame (H, W) = frame.shape[:2] rW = W / float(self.__WIDTH) rH = H / float(self.__HEIGHT) frame = cv2.resize(frame, (W, H)) count += 1 if count % self.__skipFrames == 0: # making the image blob blob = cv2.dnn.blobFromImage(frame, 1.0, (self.__WIDTH, self.__HEIGHT), (123.68, 116.78, 103.94), swapRB=True, crop=False) # run text detection if display: detectedText = self.__runTextDetectDisplay(blob, (rW, rH), original) else: detectedText = self.__runTextDetect(blob) # if text is detected if detectedText: self.__textRanks.extend([RANK_TEXT] * int(self.__skipFrames)) Log.d("Text detected.") else: self.__textRanks.extend([0] * int(self.__skipFrames)) Log.d("No text detected.") # clearing the memory myClip.release() self.__videoGetter.stop() cv2.destroyAllWindows() # calling the normalization of ranking self.__timedRankingNormalize()