Skip to content

Instantly share code, notes, and snippets.

@bufferx
Last active December 16, 2015 20:39
Show Gist options
  • Save bufferx/5494125 to your computer and use it in GitHub Desktop.
Save bufferx/5494125 to your computer and use it in GitHub Desktop.
Async Pipelined HTTP Requests Based On Tornado.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2012 Zhang ZY<http://idupx.blogspot.com/>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import logging
import time
from tornado.gen import engine as gen_engine
from tornado.gen import Task as gen_Task
from tornado.httpclient import HTTPRequest
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop
from tornado.options import options
from tornado.options import parse_command_line
AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient")
formatter = logging.Formatter('%(asctime)s - %(levelname)s - \
%(module)s.%(funcName)s:%(lineno)d - %(message)s', '')
logging.basicConfig(level=logging.DEBUG)
g_logger = logging.getLogger('root.main')
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
g_logger.addHandler(ch)
g_logger.propagate = False
log_time = lambda f, t: 'Function[%s] Consume %.3fms' % (f, (time.time() - t) * 1000)
def time_it(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
r = func(*args, **kwargs)
g_logger.info(log_time(func.__name__, start_time))
return r
return wrapper
@time_it
@gen_engine
def async_fetch(urls, callback):
assert isinstance(urls, tuple)
g_logger.debug('call async_fetch')
http_client = AsyncHTTPClient(max_clients=100)
task_list = []
for url in urls:
request = HTTPRequest(url,
method='GET',
connect_timeout=2,
request_timeout=5,
user_agent='PYTORNADO_CLIENT',
)
task_list.append(gen_Task(http_client.fetch, request))
pass
g_logger.debug('gen task: %s', url)
response_list = yield task_list
callback(response_list)
def process_response(response_list):
for response in response_list:
g_logger.info('%s\t%s\t%s\t%.3fs', response.effective_url, response.code, \
response.headers['Content-Length'] if 'Content-Length' in \
response.headers else 0, \
response.request_time)
IOLoop.instance().stop()
pass
@time_it
def main():
URLS = ('http://www.baidu.com',
'http://www.example.com',
'http://www.facebook.com',
)
async_fetch(URLS, process_response)
IOLoop.instance().start()
if __name__ == '__main__':
parse_command_line()
main()
@liushuaikobe
Copy link

我觉得你76行没必要用gen.Task封装一下吧,因为AsyncHttpclient本身已经是异步的了。

@bufferx
Copy link
Author

bufferx commented Oct 22, 2013

这个DEMO目的是演示pipeline操作, 以及同步逻辑统一多个异步操作. 也是gen.engine/gen.Task实践应用之一.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment