Last active
January 9, 2022 17:15
-
-
Save CarlosGS/f39b7ba1a79da9fc6a9ad4a4238946c5 to your computer and use it in GitHub Desktop.
Connect to an MJPG stream and compute the general direction of motion (like optic flow, but lighter). The script first crops and downsamples the JPG frames using the JPEGtran library, to allow processing in low power platforms like the Raspberry Pi.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############################################################### | |
# File: lightweight_camera_motion.py | |
# Description: Connect to an MJPG stream and compute the | |
# general direction of motion. | |
# The script first crops and downsamples the | |
# JPEG frames using the JPEGtran library, | |
# to allow processing in low power platforms | |
# like the Raspberry Pi. | |
# | |
# Author: Carlos Garcia-Saura (@CarlosGS) - 2018 | |
# Modified by: Author (email) - year | |
############################################################### | |
import time | |
import math | |
import requests | |
import cv2 | |
import numpy as np | |
from jpegtran import JPEGImage | |
# Fetch a raw JPG frame from an MJPEG stream | |
def grab_frame(url="http://localhost:8090/?action=snapshot"): | |
blob = requests.get(url).content | |
return blob | |
def round_down(num, divisor): | |
return int(num - (num%divisor)) | |
# Processes and loads a JPG frame, cropping and scaling down with the specified parameters | |
def process_frame(blob, zoom=4, newSize=100, quality=95): | |
img = JPEGImage(blob=blob) # Load the JPG image with the JPEGtran library | |
# Compute the target dimensions | |
halfW = img.width/2 | |
halfH = img.height/2 | |
side = int(img.width/zoom) | |
halfSide = side/2 | |
# Compute the starting XY coordinates | |
startW = round_down(halfW-halfSide, 16) # jpegtran requires indexing in multiples of 16 | |
startH = round_down(halfH-halfSide, 16) | |
# Apply the transformation directly to the JPG data | |
data = img.crop(startW,startH, side,side).downscale(newSize,newSize, quality).data | |
# Convert the JPG data into the OpenCV format | |
nparr = np.frombuffer(data, np.uint8) | |
frame = cv2.imdecode(nparr, 1) | |
#frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # Optional: convert to grayscale | |
return frame | |
# Grab reference frame (A) | |
A = grab_frame() | |
A = process_frame(A) | |
cv2.imshow('A', A) | |
while True: | |
# Grab latest frame (B) | |
B = grab_frame() | |
B = process_frame(B) | |
# Compute the transformation matrix | |
warp_matrix = cv2.estimateRigidTransform(A, B, fullAffine=False) | |
if warp_matrix is not None: | |
# Extract the motion parameters | |
scale = warp_matrix[0,0]-1 | |
rotation = math.asin(warp_matrix[0,1]) | |
tx = warp_matrix[0,2] | |
ty = warp_matrix[1,2] | |
print(scale) | |
print(math.degrees(rotation)) | |
print(tx) | |
print(ty) | |
print("") | |
# Useful for debugging | |
cv2.imshow('B', B) # Show the latest frame | |
Bt = cv2.warpAffine(A, warp_matrix, (A.shape[1],A.shape[0])) | |
cv2.imshow('Bt', Bt) # Show the transformed image (the "A" frame transformed to fit the latest frame) | |
else: | |
print("Couldn't match the two images!") | |
# Also useful for debugging | |
key = cv2.waitKey(250) & 0xFF | |
if key == ord('q'): break | |
cv2.destroyAllWindows() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment