import cv2 import multiprocessing import time import os import numpy as np import tensorflow as tf from utils import FPS from object_detection.utils import label_map_util from object_detection.utils import visualization_utils as vis_util CWD_PATH = os.getcwd() # Path to frozen detection graph. This is the actual model that is used for the object detection. MODEL_NAME = 'ssd_mobilenet_v1_coco_11_06_2017' PATH_TO_CKPT = os.path.join(CWD_PATH, 'object_detection', MODEL_NAME, 'frozen_inference_graph.pb') # List of the strings that is used to add correct label for each box. PATH_TO_LABELS = os.path.join(CWD_PATH, 'object_detection', 'data', 'mscoco_label_map.pbtxt') NUM_CLASSES = 90 # Loading label map label_map = label_map_util.load_labelmap(PATH_TO_LABELS) categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES, use_display_name=True) category_index = label_map_util.create_category_index(categories) def detect_objects(image_np, sess, detection_graph): # Expand dimensions since the model expects images to have shape: [1, None, None, 3] image_np_expanded = np.expand_dims(image_np, axis=0) image_tensor = detection_graph.get_tensor_by_name('image_tensor:0') # Each box represents a part of the image where a particular object was detected. boxes = detection_graph.get_tensor_by_name('detection_boxes:0') # Each score represent how level of confidence for each of the objects. # Score is shown on the result image, together with the class label. scores = detection_graph.get_tensor_by_name('detection_scores:0') classes = detection_graph.get_tensor_by_name('detection_classes:0') num_detections = detection_graph.get_tensor_by_name('num_detections:0') # Actual detection. (boxes, scores, classes, num_detections) = sess.run( [boxes, scores, classes, num_detections], feed_dict={image_tensor: image_np_expanded}) # Visualization of the results of a detection. vis_util.visualize_boxes_and_labels_on_image_array( image_np, np.squeeze(boxes), np.squeeze(classes).astype(np.int32), np.squeeze(scores), category_index, use_normalized_coordinates=True, line_thickness=8) return image_np def blend_non_transparent(face_img, overlay_img): # Let's find a mask covering all the non-black (foreground) pixels # NB: We need to do this on grayscale version of the image gray_overlay = cv2.cvtColor(overlay_img, cv2.COLOR_BGR2GRAY) overlay_mask = cv2.threshold(gray_overlay, 1, 255, cv2.THRESH_BINARY)[1] # Let's shrink and blur it a little to make the transitions smoother... overlay_mask = cv2.erode(overlay_mask, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))) overlay_mask = cv2.blur(overlay_mask, (3, 3)) # And the inverse mask, that covers all the black (background) pixels background_mask = 255 - overlay_mask # Turn the masks into three channel, so we can use them as weights overlay_mask = cv2.cvtColor(overlay_mask, cv2.COLOR_GRAY2BGR) background_mask = cv2.cvtColor(background_mask, cv2.COLOR_GRAY2BGR) # Create a masked out face image, and masked out overlay # We convert the images to floating point in range 0.0 - 1.0 face_part = (face_img * (1 / 255.0)) * (background_mask * (1 / 255.0)) overlay_part = (overlay_img * (1 / 255.0)) * (overlay_mask * (1 / 255.0)) # And finally just add them together, and rescale it back to an 8bit integer image return np.uint8(cv2.addWeighted(face_part, 255.0, overlay_part, 255.0, 0.0)) def main_process(input, output): while True: time.sleep(0.5) image = input.get() output.put(image) def child_process(input, output): # Load a (frozen) Tensorflow model into memory. detection_graph = tf.Graph() with detection_graph.as_default(): od_graph_def = tf.GraphDef() with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid: serialized_graph = fid.read() od_graph_def.ParseFromString(serialized_graph) tf.import_graph_def(od_graph_def, name='') sess = tf.Session(graph=detection_graph) while True: image = input.get() image2 = detect_objects(image, sess, detection_graph) result = blend_non_transparent(image, image2) output.put(result) if __name__ == '__main__': input = multiprocessing.Queue(5) output = multiprocessing.Queue(5) main_process = multiprocessing.Process(target=main_process, args=(input, output)) main_process.daemon = True child_process = multiprocessing.Process(target=child_process, args=(input, output)) child_process.daemon = False main_process.start() child_process.start() video_capture = cv2.VideoCapture(0) video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, 480) video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 360) while True: _, frame = video_capture.read() input.put(frame) cv2.imshow('Video', output.get()) if cv2.waitKey(1) & 0xFF == ord('q'): break