Skip to content

Instantly share code, notes, and snippets.

@justynroberts
Created February 2, 2025 16:08
Show Gist options
  • Save justynroberts/82396aa578ffed81a5cb1251192844ac to your computer and use it in GitHub Desktop.
Save justynroberts/82396aa578ffed81a5cb1251192844ac to your computer and use it in GitHub Desktop.
Simple Webcam
import argparse
import asyncio
import json
import logging
import ssl
import uuid
import datetime
import socket
import cv2
from aiohttp import web
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack
from aiortc.contrib.media import MediaStreamError
from av import VideoFrame
# Configure logging
import sys
logging.basicConfig(level=logging.WARNING, stream=sys.stdout, force=True)
logger = logging.getLogger("pc")
pcs = set()
# HTML content to be served to clients
HTML_CONTENT = """
<!DOCTYPE html>
<html>
<head>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
width: 100vw;
height: 100vh;
overflow: hidden;
background: black;
}
video {
width: 100vw;
height: 100vh;
object-fit: cover;
}
.error {
position: fixed;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
color: red;
font-weight: bold;
background: rgba(0, 0, 0, 0.8);
padding: 20px;
border-radius: 5px;
display: none;
}
.error.active {
display: block;
}
</style>
</head>
<body>
<video id="video" autoplay playsinline></video>
<div id="error" class="error"></div>
<script>
const pc = new RTCPeerConnection();
const video = document.getElementById('video');
const error = document.getElementById('error');
pc.ontrack = function(event) {
console.log('Received track:', event.track.kind);
if (!video.srcObject) {
video.srcObject = new MediaStream();
}
video.srcObject.addTrack(event.track);
};
pc.onconnectionstatechange = function() {
console.log('Connection state:', pc.connectionState);
if (pc.connectionState === 'failed') {
error.textContent = 'Connection failed. Please check your camera and refresh the page.';
error.classList.add('active');
}
};
async function negotiate() {
try {
const offer = await pc.createOffer({ offerToReceiveVideo: true });
await pc.setLocalDescription(offer);
const response = await fetch('/offer', {
method: 'POST',
body: JSON.stringify({
sdp: pc.localDescription.sdp,
type: pc.localDescription.type,
}),
headers: {
'Content-Type': 'application/json',
},
});
const answer = await response.json();
if (answer.error) {
error.textContent = answer.error;
error.classList.add('active');
return;
}
await pc.setRemoteDescription(answer);
} catch (e) {
error.textContent = 'Failed to establish connection: ' + e;
error.classList.add('active');
}
}
// Request fullscreen on click
video.addEventListener('click', () => {
if (video.requestFullscreen) {
video.requestFullscreen();
} else if (video.webkitRequestFullscreen) {
video.webkitRequestFullscreen();
} else if (video.msRequestFullscreen) {
video.msRequestFullscreen();
}
});
negotiate();
</script>
</body>
</html>
"""
class VideoTransformTrack(VideoStreamTrack):
"""
A video stream track that captures frames from the webcam,
overlays the current time in green at the bottom-right corner,
and returns the modified frames.
"""
def __init__(self):
super().__init__()
logger.info("Initializing VideoTransformTrack...")
# Try different camera indices
self.cap = None
for index in range(2): # Try first two camera indices
logger.info(f"Attempting to open camera at index {index}")
cap = cv2.VideoCapture(index)
if cap.isOpened():
logger.info(f"Successfully opened camera at index {index}")
self.cap = cap
# Try to read a test frame
ret, frame = self.cap.read()
if ret:
logger.info(f"Successfully read test frame from camera {index}")
break
else:
logger.error(f"Could not read frame from camera {index}")
cap.release()
else:
logger.error(f"Failed to open camera at index {index}")
if self.cap is None or not self.cap.isOpened():
logger.error("Failed to initialize any camera")
raise MediaStreamError("Could not initialize any camera")
# Set camera properties
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) # Full HD resolution
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
logger.info("Camera initialization complete")
async def recv(self):
logger.info("Attempting to capture frame")
if not self.cap.isOpened():
logger.error("Camera is not opened in recv()")
raise MediaStreamError("Camera is not opened")
ret, frame = self.cap.read()
if not ret:
logger.error("Failed to read frame")
raise MediaStreamError("Failed to read frame from camera")
logger.info("Successfully captured frame")
# Get current time
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
# Define text properties
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
font_color = (0, 255, 0) # Green in BGR
thickness = 2
text_size = cv2.getTextSize(timestamp, font, font_scale, thickness)[0]
# Position at bottom-right corner
text_x = frame.shape[1] - text_size[0] - 10
text_y = frame.shape[0] - 10
# Overlay the time on the frame
cv2.putText(frame, timestamp, (text_x, text_y), font, font_scale, font_color, thickness)
# Convert BGR to RGB
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Create a VideoFrame from the numpy array
new_frame = VideoFrame.from_ndarray(frame, format="rgb24")
new_frame.pts, new_frame.time_base = await self.next_timestamp()
return new_frame
def stop(self):
logger.info("Stopping VideoTransformTrack")
if self.cap and self.cap.isOpened():
self.cap.release()
logger.info("Camera released")
super().stop()
async def index(request):
return web.Response(content_type="text/html", text=HTML_CONTENT)
async def offer(request):
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection()
pcs.add(pc)
logger.info("Creating new RTCPeerConnection")
# Create video track before setting remote description
try:
local_video = VideoTransformTrack()
logger.info("Successfully created VideoTransformTrack")
# Add track and get transceiver
transceiver = pc.addTransceiver(local_video, direction="sendonly")
logger.info("Added video track to peer connection with sendonly direction")
except MediaStreamError as e:
logger.error(f"Failed to initialize camera: {e}")
return web.Response(
content_type="application/json",
text=json.dumps({"error": str(e)})
)
@pc.on("connectionstatechange")
async def on_connectionstatechange():
logger.info(f"Connection state changed to: {pc.connectionState}")
if pc.connectionState == "failed":
await pc.close()
pcs.discard(pc)
# Set remote description first
await pc.setRemoteDescription(offer)
# Create answer
answer = await pc.createAnswer()
# Set local description
await pc.setLocalDescription(answer)
return web.Response(
content_type="application/json",
text=json.dumps(
{
"sdp": pc.localDescription.sdp,
"type": pc.localDescription.type
}
)
)
async def on_shutdown(app):
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
pcs.clear()
def find_free_port(start_port, end_port=65535):
"""Find a free port in the given range."""
for port in range(start_port, end_port + 1):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', port))
s.close()
return port
except OSError:
continue
raise OSError("No free ports available in the specified range")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WebRTC video stream with timestamp overlay")
parser.add_argument("--cert-file", help="SSL certificate file (for HTTPS)")
parser.add_argument("--key-file", help="SSL key file (for HTTPS)")
parser.add_argument("--host", default="0.0.0.0", help="Host for HTTP server")
parser.add_argument("--port", type=int, default=8010, help="Port for HTTP server")
parser.add_argument("--port-range-end", type=int, default=8020,
help="End of port range to try if the specified port is in use")
args = parser.parse_args()
ssl_context = None
if args.cert_file:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(args.cert_file, args.key_file)
try:
port = find_free_port(args.port, args.port_range_end)
logger.info(f"Using port {port}")
app = web.Application()
app.on_shutdown.append(on_shutdown)
app.router.add_get("/", index)
app.router.add_post("/offer", offer)
web.run_app(app, host=args.host, port=port, ssl_context=ssl_context)
except OSError as e:
logger.error(f"Failed to start server: {e}")
logger.error(f"Please try a different port range or free up port {args.port}")
sys.exit(1)
@justynroberts
Copy link
Author

All you need to stream a webcam..
Not secure. Dont Care.

pip3 install aiohttp aiortc opencv-python av numpy

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment