Skip to content

Instantly share code, notes, and snippets.

@arganzheng
Created June 18, 2015 02:35

Revisions

  1. arganzheng created this gist Jun 18, 2015.
    275 changes: 275 additions & 0 deletions PoolingHttpClient
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,275 @@
    package org.opentsdb.client;

    import static com.google.common.base.Preconditions.checkArgument;

    import java.io.IOException;
    import java.util.concurrent.TimeUnit;

    import org.apache.commons.lang.StringUtils;
    import org.apache.http.HeaderElement;
    import org.apache.http.HeaderElementIterator;
    import org.apache.http.HttpEntity;
    import org.apache.http.HttpResponse;
    import org.apache.http.client.config.RequestConfig;
    import org.apache.http.client.methods.HttpPost;
    import org.apache.http.client.methods.HttpUriRequest;
    import org.apache.http.conn.ConnectionKeepAliveStrategy;
    import org.apache.http.conn.HttpClientConnectionManager;
    import org.apache.http.entity.StringEntity;
    import org.apache.http.impl.client.CloseableHttpClient;
    import org.apache.http.impl.client.HttpClients;
    import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
    import org.apache.http.message.BasicHeaderElementIterator;
    import org.apache.http.protocol.HTTP;
    import org.apache.http.protocol.HttpContext;
    import org.apache.http.util.EntityUtils;
    import org.opentsdb.client.response.SimpleHttpResponse;

    /**
    *
    * @author arganzheng
    *
    */
    public class PoolingHttpClient {

    private static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 200;

    private static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = DEFAULT_MAX_TOTAL_CONNECTIONS;

    private static final int DEFAULT_CONNECTION_TIMEOUT_MILLISECONDS = (10 * 1000);
    private static final int DEFAULT_READ_TIMEOUT_MILLISECONDS = (10 * 1000);
    private static final int DEFAULT_WAIT_TIMEOUT_MILLISECONDS = (10 * 1000);

    private static final int DEFAULT_KEEP_ALIVE_MILLISECONDS = (5 * 60 * 1000);

    private static final String DEFAULT_CHARSET = "UTF-8";

    private static final int DEFAULT_RETRY_COUNT = 2;

    private int keepAlive = DEFAULT_KEEP_ALIVE_MILLISECONDS;

    private int maxTotalConnections = DEFAULT_MAX_TOTAL_CONNECTIONS;
    private int maxConnectionsPerRoute = DEFAULT_MAX_CONNECTIONS_PER_ROUTE;

    private int connectTimeout = DEFAULT_CONNECTION_TIMEOUT_MILLISECONDS;
    private int readTimeout = DEFAULT_READ_TIMEOUT_MILLISECONDS;
    private int waitTimeout = DEFAULT_WAIT_TIMEOUT_MILLISECONDS;

    private int retries = DEFAULT_RETRY_COUNT;

    private PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();

    private CloseableHttpClient httpClient = null;

    private ConnectionKeepAliveStrategy keepAliveStrategy = new ConnectionKeepAliveStrategy() {
    @Override
    public long getKeepAliveDuration(HttpResponse response,
    HttpContext context) {
    HeaderElementIterator it = new BasicHeaderElementIterator(
    response.headerIterator(HTTP.CONN_KEEP_ALIVE));
    while (it.hasNext()) {
    HeaderElement he = it.nextElement();
    String param = he.getName();
    String value = he.getValue();
    if (value != null && param.equalsIgnoreCase("timeout")) {
    return Long.parseLong(value) * 1000;
    }
    }
    return keepAlive;
    }
    };

    public PoolingHttpClient() {
    // Increase max total connection
    connManager.setMaxTotal(maxTotalConnections);
    // Increase default max connection per route
    connManager.setDefaultMaxPerRoute(maxConnectionsPerRoute);

    // config timeout
    RequestConfig config = RequestConfig.custom()
    .setConnectTimeout(connectTimeout)
    .setConnectionRequestTimeout(waitTimeout)
    .setSocketTimeout(readTimeout).build();

    httpClient = HttpClients.custom()
    .setKeepAliveStrategy(keepAliveStrategy)
    .setConnectionManager(connManager)
    .setDefaultRequestConfig(config).build();

    // detect idle and expired connections and close them
    IdleConnectionMonitorThread staleMonitor = new IdleConnectionMonitorThread(
    connManager);
    staleMonitor.start();
    }

    public SimpleHttpResponse doPost(String url, String data)
    throws IOException {
    StringEntity requestEntity = new StringEntity(data);
    HttpPost postMethod = new HttpPost(url);
    postMethod.setEntity(requestEntity);

    HttpResponse response = execute(postMethod);
    int statusCode = response.getStatusLine().getStatusCode();

    SimpleHttpResponse simpleResponse = new SimpleHttpResponse();
    simpleResponse.setStatusCode(statusCode);

    HttpEntity entity = response.getEntity();
    if (entity != null) {
    // should return: application/json; charset=UTF-8
    String ctype = entity.getContentType().getValue();
    String charset = getResponseCharset(ctype);
    String content = EntityUtils.toString(entity, charset);
    simpleResponse.setContent(content);
    }

    return simpleResponse;
    }

    private static String getResponseCharset(String ctype) {
    String charset = DEFAULT_CHARSET;

    if (!StringUtils.isEmpty(ctype)) {
    String[] params = ctype.split(";");
    for (String param : params) {
    param = param.trim();
    if (param.startsWith("charset")) {
    String[] pair = param.split("=", 2);
    if (pair.length == 2) {
    if (!StringUtils.isEmpty(pair[1])) {
    charset = pair[1].trim();
    }
    }
    break;
    }
    }
    }

    return charset;
    }

    public HttpResponse execute(HttpUriRequest request) throws IOException {
    HttpResponse response;

    int tries = ++retries;
    while (true) {
    tries--;
    try {
    response = httpClient.execute(request);
    break;
    } catch (IOException e) {
    if (tries < 1)
    throw e;
    }
    }

    return response;
    }

    public void shutdown() throws IOException {
    httpClient.close();
    }

    public int getKeepAlive() {
    return keepAlive;
    }

    public void setKeepAlive(int keepAlive) {
    this.keepAlive = keepAlive;
    }

    public int getMaxTotalConnections() {
    return maxTotalConnections;
    }

    public void setMaxTotalConnections(int maxTotalConnections) {
    this.maxTotalConnections = maxTotalConnections;
    }

    public int getMaxConnectionsPerRoute() {
    return maxConnectionsPerRoute;
    }

    public void setMaxConnectionsPerRoute(int maxConnectionsPerRoute) {
    this.maxConnectionsPerRoute = maxConnectionsPerRoute;
    }

    public int getConnectTimeout() {
    return connectTimeout;
    }

    public void setConnectTimeout(int connectTimeout) {
    this.connectTimeout = connectTimeout;
    }

    public int getReadTimeout() {
    return readTimeout;
    }

    public void setReadTimeout(int readTimeout) {
    this.readTimeout = readTimeout;
    }

    public int getWaitTimeout() {
    return waitTimeout;
    }

    public void setWaitTimeout(int waitTimeout) {
    this.waitTimeout = waitTimeout;
    }

    public PoolingHttpClientConnectionManager getConnManager() {
    return connManager;
    }

    public void setConnManager(PoolingHttpClientConnectionManager connManager) {
    this.connManager = connManager;
    }

    public int getRetryCount() {
    return retries;
    }

    public void setRetryCount(int retries) {
    checkArgument(retries >= 0);
    this.retries = retries;
    }

    public static class IdleConnectionMonitorThread extends Thread {

    private final HttpClientConnectionManager connMgr;
    private volatile boolean shutdown;

    public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) {
    super();
    this.connMgr = connMgr;
    }

    @Override
    public void run() {
    try {
    while (!shutdown) {
    synchronized (this) {
    wait(5000);
    // Close expired connections
    connMgr.closeExpiredConnections();
    // Optionally, close connections
    // that have been idle longer than 60 sec
    connMgr.closeIdleConnections(60, TimeUnit.SECONDS);
    }
    }
    } catch (InterruptedException ex) {
    // terminate
    shutdown();
    }
    }

    public void shutdown() {
    shutdown = true;
    synchronized (this) {
    notifyAll();
    }
    }

    }
    }