Created
April 27, 2011 20:41
-
-
Save vsajip/945157 to your computer and use it in GitHub Desktop.
Easy to use socket servers for testing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (C) 2011 Vinay Sajip. All rights reserved. | |
# Encapsulate socket servers into an easy-to-use package | |
try: # Python 2.X | |
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler | |
from SocketServer import ThreadingUDPServer, DatagramRequestHandler, \ | |
ThreadingTCPServer, StreamRequestHandler | |
except ImportError: # Python 3.X | |
from http.server import HTTPServer, BaseHTTPRequestHandler | |
from socketserver import ThreadingUDPServer, DatagramRequestHandler, \ | |
ThreadingTCPServer, StreamRequestHandler | |
raw_input = input | |
import threading | |
import time | |
class ControlMixin(object): | |
""" | |
This mixin is used to start a server on a separate thread, and | |
shut it down programmatically. Request handling is simplified - instead | |
of needing to derive a suitable RequestHandler subclass, you just | |
provide a callable which will be passed each received request to be | |
processed. | |
:param handler: A handler callable which will be called with a | |
single parameter - the request - in order to | |
process the request. This handler is called on the | |
server thread, effectively meaning that requests are | |
processed serially. While not quite Web scale ;-), | |
this should be fine for testing applications. | |
:param poll_interval: The polling interval in seconds. | |
""" | |
def __init__(self, handler, poll_interval): | |
self._thread = None | |
self.poll_interval = poll_interval | |
self._handler = handler | |
self._ready = threading.Event() | |
def start(self): | |
""" | |
Create a daemon thread to run the server, and start it. | |
""" | |
self._thread = t = threading.Thread(target=self.serve_forever, | |
args=(self.poll_interval,)) | |
t.setDaemon(True) | |
t.start() | |
def serve_forever(self, poll_interval): | |
self._ready.set() | |
super(ControlMixin, self).serve_forever(poll_interval) | |
def stop(self): | |
""" | |
Tell the server thread to stop, and wait for it to do so. | |
""" | |
self.shutdown() | |
self._thread.join() | |
self._thread = None | |
self._ready.clear() | |
class EasyHTTPServer(ControlMixin, HTTPServer): | |
""" | |
An HTTP server which is controllable using :class:`ControlMixin`. | |
:param addr: A tuple with the IP address and port to listen on. | |
:param handler: A handler callable which will be called with a | |
single parameter - the request - in order to | |
process the request. | |
:param poll_interval: The polling interval in seconds. | |
""" | |
def __init__(self, addr, handler, poll_interval=0.5): | |
class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): | |
def __getattr__(self, name, default=None): | |
if name.startswith('do_'): | |
return self.process_request | |
raise AttributeError(name) | |
def process_request(self): | |
self.server._handler(self) | |
HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) | |
ControlMixin.__init__(self, handler, poll_interval) | |
class EasyTCPServer(ControlMixin, ThreadingTCPServer): | |
""" | |
A TCP server which is controllable using :class:`ControlMixin`. | |
:param addr: A tuple with the IP address and port to listen on. | |
:param handler: A handler callable which will be called with a | |
single parameter - the request - in order to | |
process the request. | |
:param poll_interval: The polling interval in seconds. | |
:bind_and_activate: If True (the default), binds the server and | |
starts it listening. If False, you need to | |
call :meth:`server_bind` and | |
:meth:`server_activate` at some later time | |
before calling :meth:`start`, so that the server will | |
set up the socket and listen on it. | |
""" | |
def __init__(self, addr, handler, poll_interval=0.5, bind_and_activate=True): | |
class DelegatingTCPRequestHandler(StreamRequestHandler): | |
def handle(self): | |
self.server._handler(self) | |
ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler, | |
bind_and_activate) | |
ControlMixin.__init__(self, handler, poll_interval) | |
class EasyUDPServer(ControlMixin, ThreadingUDPServer): | |
""" | |
A UDP server which is controllable using :class:`ControlMixin`. | |
:param addr: A tuple with the IP address and port to listen on. | |
:param handler: A handler callable which will be called with a | |
single parameter - the request - in order to | |
process the request. | |
:param poll_interval: The polling interval for shutdown requests, | |
in seconds. | |
:bind_and_activate: If True (the default), binds the server and | |
starts it listening. If False, you need to | |
call :meth:`server_bind` and | |
:meth:`server_activate` at some later time | |
before calling :meth:`start`, so that the server will | |
set up the socket and listen on it. | |
""" | |
def __init__(self, addr, handler, poll_interval=0.5, bind_and_activate=True): | |
class DelegatingUDPRequestHandler(DatagramRequestHandler): | |
def handle(self): | |
self.server._handler(self) | |
ThreadingUDPServer.__init__(self, addr, DelegatingUDPRequestHandler, | |
bind_and_activate) | |
ControlMixin.__init__(self, handler, poll_interval) | |
def main(): | |
def deleg(request): | |
s = '%.2f' % time.time() | |
request.wfile.write(s.encode('utf-8')) | |
httpserver = EasyHTTPServer(('', 8090), deleg, 0.001) | |
tcpserver = EasyTCPServer(('', 8091), deleg, 0.001) | |
udpserver = EasyUDPServer(('', 8092), deleg, 0.001) | |
httpserver.start() | |
tcpserver.start() | |
udpserver.start() | |
raw_input('Servers started, press enter to stop:') | |
udpserver.stop() | |
tcpserver.stop() | |
httpserver.stop() | |
print('All done.') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment