Last active
September 1, 2024 11:13
-
-
Save misaelnieto/2409785 to your computer and use it in GitHub Desktop.
Streaming MJPEG over HTTP with gstreamr and python - WSGI version
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
#based on the ideas from http://synack.me/blog/implementing-http-live-streaming | |
# Updates: | |
# - 2024-04-24: Apply suggestions from @Pin80 | |
# Run this script and then launch the following pipeline: | |
# gst-launch videotestsrc pattern=ball ! video/x-raw-rgb, framerate=15/1, width=640, height=480 ! jpegenc ! multipartmux boundary=spionisto ! tcpclientsink port=9999 | |
#updated command line | |
#gst-launch-1.0 videotestsrc pattern=ball ! videoconvert ! video/x-raw, framerate=15/1, width=640, height=480 ! jpegenc ! multipartmux boundary=spionisto ! #tcpclientsink port=9999 | |
from multiprocessing import Queue | |
from threading import Thread | |
from socket import socket | |
from select import select | |
from wsgiref.simple_server import WSGIServer, make_server, WSGIRequestHandler | |
from socketserver import ThreadingMixIn | |
from wsgiref.validate import validator | |
import time | |
class MyWSGIServer(ThreadingMixIn, WSGIServer): | |
pass | |
def create_server(host, port, app, server_class=MyWSGIServer, | |
handler_class=WSGIRequestHandler): | |
return make_server(host, port, app, server_class, handler_class) | |
INDEX_PAGE = b""" | |
<html> | |
<head> | |
<title>Gstreamer testing</title> | |
</head> | |
<body> | |
<h1>Test page for dummy camera with GStreamer</h1> | |
<img src="http://127.0.0.1:1337/mjpeg_stream"/> | |
<hr /> | |
</body> | |
</html> | |
""" | |
INDEX_PAGE2 = b""" | |
<html> | |
<head> | |
<title>Gstreamer testing</title> | |
</head> | |
<body> | |
<h1>Test page for dummy camera with GStreamer</h1> | |
<source src="http://127.0.0.1:1337/mjpeg_stream"type="video/jpeg"></source> | |
<hr /> | |
</body> | |
</html> | |
""" | |
ERROR_404 = b""" | |
<html> | |
<head> | |
<title>404 - Not Found</title> | |
</head> | |
<body> | |
<h1>404 - Not Found</h1> | |
</body> | |
</html> | |
""" | |
class IPCameraApp(object): | |
queues = [] | |
def __call__(self,environ, start_response): | |
print("env=",environ['PATH_INFO']) | |
if environ['PATH_INFO'] == '/': | |
start_response("200 OK", [ | |
("Content-Type", "text/html"), | |
("Content-Length", str(len(INDEX_PAGE))) | |
]) | |
return iter([INDEX_PAGE]) | |
elif environ['PATH_INFO'] == '/mjpeg_stream': | |
return self.stream(start_response) | |
else: | |
start_response("404 Not Found", [ | |
("Content-Type", "text/html"), | |
("Content-Length", str(len(ERROR_404))) | |
]) | |
return iter([ERROR_404]) | |
def stream(self, start_response): | |
start_response('200 OK', [('Content-type', 'multipart/x-mixed-replace; boundary=--spionisto')]) | |
q = Queue(4096) | |
self.queues.append(q) | |
while True: | |
try: | |
yield q.get() | |
time.sleep(0.001) | |
except: | |
if q in self.queues: | |
self.queues.remove(q) | |
return | |
def input_loop(app): | |
sock = socket() | |
sock.bind(('127.0.0.1', 9999)) | |
sock.listen(1) | |
while True: | |
print('Waiting for input stream') | |
sd, addr = sock.accept() | |
print('Accepted input stream from', addr) | |
data_flag = True | |
while data_flag: | |
readable = select([sd], [], [], 0.1)[0] | |
for s in readable: | |
data = s.recv(1024) | |
if not data: | |
break | |
for q in app.queues: | |
q.put(data) | |
time.sleep(0.001) | |
print('Lost input stream from', addr) | |
if __name__ == '__main__': | |
#Launch an instance of wsgi server | |
app = IPCameraApp() | |
port = 1337 | |
print('Launching camera server on port', port) | |
httpd = create_server('127.0.0.1', port, app) | |
print('Launch input stream thread') | |
t1 = Thread(target=input_loop, args=[app]) | |
#t1.setDaemon(True) | |
t1.daemon = True | |
t1.start() | |
try: | |
print('Httpd serve forever') | |
httpd.serve_forever() | |
except KeyboardInterrupt: | |
httpd.kill() | |
print("Shutdown camera server ...") |
I fixed all bugs, but I haven't watched the picture yet in browser.
#!/usr/bin/python
#based on the ideas from http://synack.me/blog/implementing-http-live-streaming
# Run this script and then launch the following pipeline:
# gst-launch videotestsrc pattern=ball ! video/x-raw-rgb, framerate=15/1, width=640, height=480 ! jpegenc ! multipartmux boundary=spionisto ! tcpclientsink port=9999
#updated command line
#gst-launch-1.0 videotestsrc pattern=ball ! videoconvert ! video/x-raw, framerate=15/1, width=640, height=480 ! jpegenc ! multipartmux boundary=spionisto ! #tcpclientsink port=9999
from multiprocessing import Queue
from threading import Thread
from socket import socket
from select import select
from wsgiref.simple_server import WSGIServer, make_server, WSGIRequestHandler
from socketserver import ThreadingMixIn
from wsgiref.validate import validator
class MyWSGIServer(ThreadingMixIn, WSGIServer):
pass
def create_server(host, port, app, server_class=MyWSGIServer,
handler_class=WSGIRequestHandler):
return make_server(host, port, app, server_class, handler_class)
INDEX_PAGE = b"""
<html>
<head>
<title>Gstreamer testing</title>
</head>
<body>
<h1>Test page for dummy camera with GStreamer</h1>
<img src="/mjpeg_stream"/ >
<hr />
</body>
</html>
"""
ERROR_404 = b"""
<html>
<head>
<title>404 - Not Found</title>
</head>
<body>
<h1>404 - Not Found</h1>
</body>
</html>
"""
class IPCameraApp(object):
queues = []
def __call__(self,environ, start_response):
if environ['PATH_INFO'] == '/':
start_response("200 OK", [
("Content-Type", "text/html"),
("Content-Length", str(len(INDEX_PAGE)))
])
return iter([INDEX_PAGE])
elif environ['PATH_INFO'] == '/mjpeg_stream':
return self.stream(start_response)
else:
start_response("404 Not Found", [
("Content-Type", "text/html"),
("Content-Length", str(len(ERROR_404)))
])
return iter([ERROR_404])
def stream(self, start_response):
print("start mjpegt video")
start_response('200 OK', [('Content-type', 'multipart/x-mixed-replace; boundary=--spionisto')])
q = Queue()
self.queues.append(q)
while True:
try:
yield q.get()
except:
if q in self.queues:
self.queues.remove(q)
return
def input_loop(app):
sock = socket()
sock.bind(('127.0.0.1', 9999))
sock.listen(1)
while True:
print('Waiting for input stream')
sd, addr = sock.accept()
print('Accepted input stream from', addr)
data = True
while data:
readable = select([sd], [], [], 0.1)[0]
for s in readable:
data = s.recv(1024)
if not data:
break
for q in app.queues:
q.put(data)
print('Lost input stream from', addr)
if __name__ == '__main__':
#Launch an instance of wsgi server
app = IPCameraApp()
port = 1337
print('Launching camera server on port', port)
httpd = create_server('127.0.0.1', port, app)
print('Launch input stream thread')
t1 = Thread(target=input_loop, args=[app])
#t1.setDaemon(True)
t1.daemon = True
t1.start()
try:
print('Httpd serve forever')
httpd.serve_forever()
except KeyboardInterrupt:
httpd.kill()
print("Shutdown camera server ...")
Final edition:
#!/usr/bin/python
#based on the ideas from http://synack.me/blog/implementing-http-live-streaming
# Run this script and then launch the following pipeline:
# gst-launch videotestsrc pattern=ball ! video/x-raw-rgb, framerate=15/1, width=640, height=480 ! jpegenc ! multipartmux boundary=spionisto ! tcpclientsink port=9999
#updated command line
#gst-launch-1.0 videotestsrc pattern=ball ! videoconvert ! video/x-raw, framerate=15/1, width=640, height=480 ! jpegenc ! multipartmux boundary=spionisto ! #tcpclientsink port=9999
from multiprocessing import Queue
from threading import Thread
from socket import socket
from select import select
from wsgiref.simple_server import WSGIServer, make_server, WSGIRequestHandler
from socketserver import ThreadingMixIn
from wsgiref.validate import validator
import time
class MyWSGIServer(ThreadingMixIn, WSGIServer):
pass
def create_server(host, port, app, server_class=MyWSGIServer,
handler_class=WSGIRequestHandler):
return make_server(host, port, app, server_class, handler_class)
INDEX_PAGE = b"""
<html>
<head>
<title>Gstreamer testing</title>
</head>
<body>
<h1>Test page for dummy camera with GStreamer</h1>
<img src="http://127.0.0.1:1337/mjpeg_stream"/>
<hr />
</body>
</html>
"""
INDEX_PAGE2 = b"""
<html>
<head>
<title>Gstreamer testing</title>
</head>
<body>
<h1>Test page for dummy camera with GStreamer</h1>
<source src="http://127.0.0.1:1337/mjpeg_stream"type="video/jpeg"></source>
<hr />
</body>
</html>
"""
ERROR_404 = b"""
<html>
<head>
<title>404 - Not Found</title>
</head>
<body>
<h1>404 - Not Found</h1>
</body>
</html>
"""
class IPCameraApp(object):
queues = []
def __call__(self,environ, start_response):
print("env=",environ['PATH_INFO'])
if environ['PATH_INFO'] == '/':
start_response("200 OK", [
("Content-Type", "text/html"),
("Content-Length", str(len(INDEX_PAGE)))
])
return iter([INDEX_PAGE])
elif environ['PATH_INFO'] == '/mjpeg_stream':
return self.stream(start_response)
else:
start_response("404 Not Found", [
("Content-Type", "text/html"),
("Content-Length", str(len(ERROR_404)))
])
return iter([ERROR_404])
def stream(self, start_response):
start_response('200 OK', [('Content-type', 'multipart/x-mixed-replace; boundary=--spionisto')])
q = Queue(4096)
self.queues.append(q)
while True:
try:
yield q.get()
time.sleep(0.001)
except:
if q in self.queues:
self.queues.remove(q)
return
def input_loop(app):
sock = socket()
sock.bind(('127.0.0.1', 9999))
sock.listen(1)
while True:
print('Waiting for input stream')
sd, addr = sock.accept()
print('Accepted input stream from', addr)
data_flag = True
while data_flag:
readable = select([sd], [], [], 0.1)[0]
for s in readable:
data = s.recv(1024)
if not data:
break
for q in app.queues:
q.put(data)
time.sleep(0.001)
print('Lost input stream from', addr)
if __name__ == '__main__':
#Launch an instance of wsgi server
app = IPCameraApp()
port = 1337
print('Launching camera server on port', port)
httpd = create_server('127.0.0.1', port, app)
print('Launch input stream thread')
t1 = Thread(target=input_loop, args=[app])
#t1.setDaemon(True)
t1.daemon = True
t1.start()
try:
print('Httpd serve forever')
httpd.serve_forever()
except KeyboardInterrupt:
httpd.kill()
print("Shutdown camera server ...")
Great job
Great job
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It seems the code is deprecated, I updated code, but errors didn't disappear.
My code proposal:
Errors:
user@usercomp:~/MySoftware/study_gstreamer/123$ python ./live-mjpeg-stream.py
Launching camera server on port 1337
Launch input stream thread
Httpd serve forever
Waiting for input stream
Accepted input stream from ('127.0.0.1', 36966)
Lost input stream from ('127.0.0.1', 36966)
Waiting for input stream
Accepted input stream from ('127.0.0.1', 46856)
Traceback (most recent call last):
File "/usr/lib/python3.10/wsgiref/handlers.py", line 138, in run
self.finish_response()
File "/usr/lib/python3.10/wsgiref/handlers.py", line 184, in finish_response
self.write(data)
File "/usr/lib/python3.10/wsgiref/handlers.py", line 279, in write
assert type(data) is bytes,
AssertionError: write() argument must be a bytes instance
127.0.0.1 - - [20/Mar/2024 12:27:13] "GET / HTTP/1.1" 500 59