Created
February 2, 2025 16:08
-
-
Save justynroberts/82396aa578ffed81a5cb1251192844ac to your computer and use it in GitHub Desktop.
Simple Webcam
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import asyncio | |
import json | |
import logging | |
import ssl | |
import uuid | |
import datetime | |
import socket | |
import cv2 | |
from aiohttp import web | |
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack | |
from aiortc.contrib.media import MediaStreamError | |
from av import VideoFrame | |
# Configure logging | |
import sys | |
logging.basicConfig(level=logging.WARNING, stream=sys.stdout, force=True) | |
logger = logging.getLogger("pc") | |
pcs = set() | |
# HTML content to be served to clients | |
HTML_CONTENT = """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<style> | |
* { | |
margin: 0; | |
padding: 0; | |
box-sizing: border-box; | |
} | |
body { | |
width: 100vw; | |
height: 100vh; | |
overflow: hidden; | |
background: black; | |
} | |
video { | |
width: 100vw; | |
height: 100vh; | |
object-fit: cover; | |
} | |
.error { | |
position: fixed; | |
top: 50%; | |
left: 50%; | |
transform: translate(-50%, -50%); | |
color: red; | |
font-weight: bold; | |
background: rgba(0, 0, 0, 0.8); | |
padding: 20px; | |
border-radius: 5px; | |
display: none; | |
} | |
.error.active { | |
display: block; | |
} | |
</style> | |
</head> | |
<body> | |
<video id="video" autoplay playsinline></video> | |
<div id="error" class="error"></div> | |
<script> | |
const pc = new RTCPeerConnection(); | |
const video = document.getElementById('video'); | |
const error = document.getElementById('error'); | |
pc.ontrack = function(event) { | |
console.log('Received track:', event.track.kind); | |
if (!video.srcObject) { | |
video.srcObject = new MediaStream(); | |
} | |
video.srcObject.addTrack(event.track); | |
}; | |
pc.onconnectionstatechange = function() { | |
console.log('Connection state:', pc.connectionState); | |
if (pc.connectionState === 'failed') { | |
error.textContent = 'Connection failed. Please check your camera and refresh the page.'; | |
error.classList.add('active'); | |
} | |
}; | |
async function negotiate() { | |
try { | |
const offer = await pc.createOffer({ offerToReceiveVideo: true }); | |
await pc.setLocalDescription(offer); | |
const response = await fetch('/offer', { | |
method: 'POST', | |
body: JSON.stringify({ | |
sdp: pc.localDescription.sdp, | |
type: pc.localDescription.type, | |
}), | |
headers: { | |
'Content-Type': 'application/json', | |
}, | |
}); | |
const answer = await response.json(); | |
if (answer.error) { | |
error.textContent = answer.error; | |
error.classList.add('active'); | |
return; | |
} | |
await pc.setRemoteDescription(answer); | |
} catch (e) { | |
error.textContent = 'Failed to establish connection: ' + e; | |
error.classList.add('active'); | |
} | |
} | |
// Request fullscreen on click | |
video.addEventListener('click', () => { | |
if (video.requestFullscreen) { | |
video.requestFullscreen(); | |
} else if (video.webkitRequestFullscreen) { | |
video.webkitRequestFullscreen(); | |
} else if (video.msRequestFullscreen) { | |
video.msRequestFullscreen(); | |
} | |
}); | |
negotiate(); | |
</script> | |
</body> | |
</html> | |
""" | |
class VideoTransformTrack(VideoStreamTrack): | |
""" | |
A video stream track that captures frames from the webcam, | |
overlays the current time in green at the bottom-right corner, | |
and returns the modified frames. | |
""" | |
def __init__(self): | |
super().__init__() | |
logger.info("Initializing VideoTransformTrack...") | |
# Try different camera indices | |
self.cap = None | |
for index in range(2): # Try first two camera indices | |
logger.info(f"Attempting to open camera at index {index}") | |
cap = cv2.VideoCapture(index) | |
if cap.isOpened(): | |
logger.info(f"Successfully opened camera at index {index}") | |
self.cap = cap | |
# Try to read a test frame | |
ret, frame = self.cap.read() | |
if ret: | |
logger.info(f"Successfully read test frame from camera {index}") | |
break | |
else: | |
logger.error(f"Could not read frame from camera {index}") | |
cap.release() | |
else: | |
logger.error(f"Failed to open camera at index {index}") | |
if self.cap is None or not self.cap.isOpened(): | |
logger.error("Failed to initialize any camera") | |
raise MediaStreamError("Could not initialize any camera") | |
# Set camera properties | |
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) # Full HD resolution | |
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) | |
logger.info("Camera initialization complete") | |
async def recv(self): | |
logger.info("Attempting to capture frame") | |
if not self.cap.isOpened(): | |
logger.error("Camera is not opened in recv()") | |
raise MediaStreamError("Camera is not opened") | |
ret, frame = self.cap.read() | |
if not ret: | |
logger.error("Failed to read frame") | |
raise MediaStreamError("Failed to read frame from camera") | |
logger.info("Successfully captured frame") | |
# Get current time | |
timestamp = datetime.datetime.now().strftime("%H:%M:%S") | |
# Define text properties | |
font = cv2.FONT_HERSHEY_SIMPLEX | |
font_scale = 1 | |
font_color = (0, 255, 0) # Green in BGR | |
thickness = 2 | |
text_size = cv2.getTextSize(timestamp, font, font_scale, thickness)[0] | |
# Position at bottom-right corner | |
text_x = frame.shape[1] - text_size[0] - 10 | |
text_y = frame.shape[0] - 10 | |
# Overlay the time on the frame | |
cv2.putText(frame, timestamp, (text_x, text_y), font, font_scale, font_color, thickness) | |
# Convert BGR to RGB | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
# Create a VideoFrame from the numpy array | |
new_frame = VideoFrame.from_ndarray(frame, format="rgb24") | |
new_frame.pts, new_frame.time_base = await self.next_timestamp() | |
return new_frame | |
def stop(self): | |
logger.info("Stopping VideoTransformTrack") | |
if self.cap and self.cap.isOpened(): | |
self.cap.release() | |
logger.info("Camera released") | |
super().stop() | |
async def index(request): | |
return web.Response(content_type="text/html", text=HTML_CONTENT) | |
async def offer(request): | |
params = await request.json() | |
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) | |
pc = RTCPeerConnection() | |
pcs.add(pc) | |
logger.info("Creating new RTCPeerConnection") | |
# Create video track before setting remote description | |
try: | |
local_video = VideoTransformTrack() | |
logger.info("Successfully created VideoTransformTrack") | |
# Add track and get transceiver | |
transceiver = pc.addTransceiver(local_video, direction="sendonly") | |
logger.info("Added video track to peer connection with sendonly direction") | |
except MediaStreamError as e: | |
logger.error(f"Failed to initialize camera: {e}") | |
return web.Response( | |
content_type="application/json", | |
text=json.dumps({"error": str(e)}) | |
) | |
@pc.on("connectionstatechange") | |
async def on_connectionstatechange(): | |
logger.info(f"Connection state changed to: {pc.connectionState}") | |
if pc.connectionState == "failed": | |
await pc.close() | |
pcs.discard(pc) | |
# Set remote description first | |
await pc.setRemoteDescription(offer) | |
# Create answer | |
answer = await pc.createAnswer() | |
# Set local description | |
await pc.setLocalDescription(answer) | |
return web.Response( | |
content_type="application/json", | |
text=json.dumps( | |
{ | |
"sdp": pc.localDescription.sdp, | |
"type": pc.localDescription.type | |
} | |
) | |
) | |
async def on_shutdown(app): | |
coros = [pc.close() for pc in pcs] | |
await asyncio.gather(*coros) | |
pcs.clear() | |
def find_free_port(start_port, end_port=65535): | |
"""Find a free port in the given range.""" | |
for port in range(start_port, end_port + 1): | |
try: | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.bind(('', port)) | |
s.close() | |
return port | |
except OSError: | |
continue | |
raise OSError("No free ports available in the specified range") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="WebRTC video stream with timestamp overlay") | |
parser.add_argument("--cert-file", help="SSL certificate file (for HTTPS)") | |
parser.add_argument("--key-file", help="SSL key file (for HTTPS)") | |
parser.add_argument("--host", default="0.0.0.0", help="Host for HTTP server") | |
parser.add_argument("--port", type=int, default=8010, help="Port for HTTP server") | |
parser.add_argument("--port-range-end", type=int, default=8020, | |
help="End of port range to try if the specified port is in use") | |
args = parser.parse_args() | |
ssl_context = None | |
if args.cert_file: | |
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) | |
ssl_context.load_cert_chain(args.cert_file, args.key_file) | |
try: | |
port = find_free_port(args.port, args.port_range_end) | |
logger.info(f"Using port {port}") | |
app = web.Application() | |
app.on_shutdown.append(on_shutdown) | |
app.router.add_get("/", index) | |
app.router.add_post("/offer", offer) | |
web.run_app(app, host=args.host, port=port, ssl_context=ssl_context) | |
except OSError as e: | |
logger.error(f"Failed to start server: {e}") | |
logger.error(f"Please try a different port range or free up port {args.port}") | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
All you need to stream a webcam..
Not secure. Dont Care.
pip3 install aiohttp aiortc opencv-python av numpy