apache HttpClient 源码分析
这里简单地记录了 org.apache.commons.httpclient 与 org.apache.commons.httpclient.MultiThreadedHttpConnectionManager 相结合,构建 http 连接池,提高这种因多次连接而产生不必要的时间消耗效率. 这里还有个很重要的类,org.apache.commons.httpclient.methods.PostMethod,它是整个http请求的业务处理类;
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager: 负责连接的管理; 基于服务器ip为key; 存在两个池,一个用于存放 服务器集合,另一个用于存放各服务器所对应的 连接集合
org.apache.commons.httpclient.methods.PostMethod: 负责http业务调度; 1:进行请求头的拼装 2:调用connection连接,进行数据的请求 3:响应的处理,解析相应的请求头信息
org.apache.commons.httpclient : 封装类,把上述的 MultiThreadedHttpConnectionManager 与 PostMethod 整合,供外部使用.
http仅仅只是一个超文本协议,非传输技术,作用于应用层; 它依赖于传输层的 tcp/ip 协议,进行两端的信息交互; 而tcp/ip 也只是一种交互协议,而socket是目前较为流行的开源java方法类. 因此客户端与服务端间进行http协议交互,实际上是通过下层的socket工具类,进行 tcp/ip 交互.
这里也顺便提一下平常用开的jdbc,它是直接走 传输层,利用 tcp/ip 协议与数据库进行交互,实际上还是通过socket技术与数据库进行二进制的协议数据交互.(有空可以百度了解 二进制协议数据结构)
首先开始我们以 HttpClientUtil 测试类进行开篇,简单地使用 HttpClient 与server进行数据请求.
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
import org.apache.commons.httpclient.Header;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.NameValuePair;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.chainsaw.Main;
import com.g3net.tool.HttpUtils;
public class HttpClientUtil {
/**
* 日志处理类
*/
private static final Log log = LogFactory.getLog(HttpClientUtil.class);
// 读取超时
private final static int SOCKET_TIMEOUT = 10000;
// 连接超时
private final static int CONNECTION_TIMEOUT = 10000;
// 每个HOST的最大连接数量
private final static int MAX_CONN_PRE_HOST = 20;
// 连接池的最大连接数
private final static int MAX_CONN = 60;
// 连接池
private final static HttpConnectionManager httpConnectionManager;
static {
httpConnectionManager = new MultiThreadedHttpConnectionManager();
HttpConnectionManagerParams params = httpConnectionManager.getParams();
params.setConnectionTimeout(CONNECTION_TIMEOUT);
params.setSoTimeout(SOCKET_TIMEOUT);
params.setDefaultMaxConnectionsPerHost(MAX_CONN_PRE_HOST);
params.setMaxTotalConnections(MAX_CONN);
}
/**
* 发送主要方法,异常捕获
* @param post
* @param code
* @return
*/
public static String doHttpRequest(PostMethod post, String code) {
HttpClient httpClient = new HttpClient(httpConnectionManager);
BufferedReader in = null;
String resultString = "";
try {
httpClient.executeMethod(post);
in = new BufferedReader(new InputStreamReader(post
.getResponseBodyAsStream(), code));
StringBuffer buffer = new StringBuffer();
String line = "";
while ((line = in.readLine()) != null) {
buffer.append(line);
}
resultString = buffer.toString();
} catch (SocketTimeoutException e) {
log.error("连接超时" + e.toString());
resultString = returnError("连接超时");
} catch (HttpException e) {
log.error("读取外部服务器数据失败" + e.toString());
resultString = returnError("读取外部服务器数据失败");
} catch (UnknownHostException e) {
log.error("请求的主机地址无效" + e.toString());
resultString = returnError("请求的主机地址无效");
} catch (IOException e) {
log.error("向外部接口发送数据失败" + e.toString());
resultString = returnError("向外部接口发送数据失败");
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
post.releaseConnection();
}
return resultString;
}
/**
* 设置一下返回错误的通用提示,可以自定义格式.
* @param reason
* @return
*/
public static String returnError(String reason) {
StringBuffer buffer = new StringBuffer();
buffer.append("<?xml version=\"1.0\" encoding=\"GBK\"?>");
buffer.append("<Response>");
buffer.append("<Success>false</Success>");
buffer.append("<reason>");
buffer.append(reason);
buffer.append("</reason>");
buffer.append("</Response>");
return buffer.toString();
}
public final static String REQUEST_HEADER = "x-forwarded-for";
/**
* 将客户IP写入请求头
* 这个设置可以伪装IP请求,注意使用
* @param client
* @param ip
* @return
*/
public static void resetRequestHeader(HttpClient client, String ip) {
List<Header> headers = new ArrayList<Header>();
headers.add(new Header(REQUEST_HEADER, ip));
client.getHostConfiguration().getParams().setParameter(
"http.default-headers", headers);
}
public static void main(String[] args) throws Exception{
String url = "http://192.168.162.38/applib/webcontent/interface/searchnew.jsp?key=theme&ty=0&n=100";
PostMethod postMethod = new PostMethod(url);// 放地址
//请求头设置
postMethod.setRequestHeader("Host", "www.baidu.com");
postMethod.setRequestHeader("Connection", "keep-alive");
postMethod.setRequestHeader("User-Agent", "Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.4 (KHTML, like Gecko) Chrome/22.0.1229.92 Safari/537.4");
postMethod.setRequestHeader("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8");
postMethod.setRequestHeader("Accept-Encoding", "gzip,deflate,sdch");
postMethod.setRequestHeader("Accept-Language", "zh-CN,zh;q=0.8");
postMethod.setRequestHeader("Accept-Charset", "GBK,utf-8;q=0.7,*;q=0.3");
String result = HttpClientUtil.doHttpRequest(postMethod, "utf-8");
System.out.println(result);
}
}
httpClient.executeMethod(post);
public int executeMethod(HttpMethod method)
throws IOException, HttpException
{
LOG.trace("enter HttpClient.executeMethod(HttpMethod)");
return executeMethod(null, method, null);
}
在HttpClient 中,我们发现,它把 连接池以及服务的请求处理,封装到了 org.apache.commons.httpclient.HttpMethodDirector.
public int executeMethod(HostConfiguration hostconfig, HttpMethod method, HttpState state)
throws IOException, HttpException
{
LOG.trace("enter HttpClient.executeMethod(HostConfiguration,HttpMethod,HttpState)");
if(method == null)
throw new IllegalArgumentException("HttpMethod parameter may not be null");
//这里是获取服务端信息,ip 端口号 协议 等.
HostConfiguration defaulthostconfig = getHostConfiguration();
if(hostconfig == null)
hostconfig = defaulthostconfig;
URI uri = method.getURI();
if(hostconfig == defaulthostconfig || uri.isAbsoluteURI())
{
hostconfig = (HostConfiguration)hostconfig.clone();
if(uri.isAbsoluteURI())
hostconfig.setHost(uri);
}
//这里把连接的获取,以及服务的请求,给封装到一齐
HttpMethodDirector methodDirector = new HttpMethodDirector(getHttpConnectionManager(), hostconfig, params, state != null ? state : getState());
methodDirector.executeMethod(method);
return method.getStatusCode();
}
在 org.apache.commons.httpclient.HttpMethodDirector ,通过 executeMethod 进行业务请求处理
public void executeMethod(HttpMethod method)
throws IOException, HttpException
{
if(method == null)
throw new IllegalArgumentException("Method may not be null");
hostConfiguration.getParams().setDefaults(params);
method.getParams().setDefaults(hostConfiguration.getParams());
Collection defaults = (Collection)hostConfiguration.getParams().getParameter("http.default-headers");
if(defaults != null)
{
for(Iterator i = defaults.iterator(); i.hasNext(); method.addRequestHeader((Header)i.next()));
}
int maxRedirects = params.getIntParameter("http.protocol.max-redirects", 100);
int redirectCount = 0;
do
{
if(conn != null && !hostConfiguration.hostEquals(conn))
{
conn.setLocked(false);
conn.releaseConnection();
conn = null;
}
if(conn == null)
{
//通过连接池connectionManager,从池中提取有效连接
conn = connectionManager.getConnectionWithTimeout(hostConfiguration, params.getConnectionManagerTimeout());
conn.setLocked(true);
if(params.isAuthenticationPreemptive() || state.isAuthenticationPreemptive())
{
LOG.debug("Preemptively sending default basic credentials");
method.getHostAuthState().setPreemptive();
method.getHostAuthState().setAuthAttempted(true);
if(conn.isProxied() && !conn.isSecure())
{
method.getProxyAuthState().setPreemptive();
method.getProxyAuthState().setAuthAttempted(true);
}
}
}
authenticate(method);
executeWithRetry(method);
if(connectMethod != null)
{
fakeResponse(method);
break;
}
boolean retry = false;
if(isRedirectNeeded(method) && processRedirectResponse(method))
{
retry = true;
if(++redirectCount >= maxRedirects)
{
LOG.error("Narrowly avoided an infinite loop in execute");
throw new RedirectException("Maximum redirects (" + maxRedirects + ") exceeded");
}
if(LOG.isDebugEnabled())
LOG.debug("Execute redirect " + redirectCount + " of " + maxRedirects);
}
if(isAuthenticationNeeded(method) && processAuthenticationResponse(method))
{
LOG.debug("Retry authentication");
retry = true;
}
if(!retry)
break;
if(method.getResponseBodyAsStream() != null)
method.getResponseBodyAsStream().close();
} while(true);
if(conn != null)
conn.setLocked(false);
if((releaseConnection || method.getResponseBodyAsStream() == null) && conn != null)
conn.releaseConnection();
break MISSING_BLOCK_LABEL_583;
Exception exception;
exception;
if(conn != null)
conn.setLocked(false);
if((releaseConnection || method.getResponseBodyAsStream() == null) && conn != null)
conn.releaseConnection();
throw exception;
}
private void executeWithRetry(HttpMethod method)
throws IOException, HttpException
{
int execCount = 0;
_L2:
execCount++;
if(LOG.isTraceEnabled())
LOG.trace("Attempt number " + execCount + " to process request");
if(conn.getParams().isStaleCheckingEnabled())
conn.closeIfStale();
if(!conn.isOpen())
{
conn.open();
if(conn.isProxied() && conn.isSecure() && !(method instanceof ConnectMethod) && !executeConnect())
return;
}
try
{
try
{
applyConnectionParams(method);
//method 为业务处理类,包含了请求头/响应头等信息,但需要 conn 作为桥梁与服务器进行通讯
method.execute(state, conn);
break MISSING_BLOCK_LABEL_452;
}
catch(HttpException e)
{
throw e;
}
catch(IOException e)
{
LOG.debug("Closing the connection.");
conn.close();
HttpMethodRetryHandler handler;
if(method instanceof HttpMethodBase)
{
handler = ((HttpMethodBase)method).getMethodRetryHandler();
if(handler != null && !handler.retryMethod(method, conn, new HttpRecoverableException(e.getMessage()), execCount, method.isRequestSent()))
{
LOG.debug("Method retry handler returned false. Automatic recovery will not be attempted");
throw e;
}
}
handler = (HttpMethodRetryHandler)method.getParams().getParameter("http.method.retry-handler");
if(handler == null)
handler = new DefaultHttpMethodRetryHandler();
if(!handler.retryMethod(method, e, execCount))
{
LOG.debug("Method retry handler returned false. Automatic recovery will not be attempted");
throw e;
}
if(LOG.isInfoEnabled())
LOG.info("I/O exception (" + e.getClass().getName() + ") caught when processing request: " + e.getMessage());
if(LOG.isDebugEnabled())
LOG.debug(e.getMessage(), e);
LOG.info("Retrying request");
}
}
catch(IOException e)
{
if(conn.isOpen())
{
LOG.debug("Closing the connection.");
conn.close();
}
releaseConnection = true;
throw e;
}
catch(RuntimeException e)
{
if(conn.isOpen())
{
LOG.debug("Closing the connection.");
conn.close();
}
releaseConnection = true;
throw e;
}
if(true) goto _L2; else goto _L1
_L1:
}
上述干了许多活儿,到最后还是调用了最开始的 org.apache.commons.httpclient.methods.PostMethod 中的 excute 方法,进行http请求
public int execute(HttpState state, HttpConnection conn)
throws HttpException, IOException
{
LOG.trace("enter HttpMethodBase.execute(HttpState, HttpConnection)");
responseConnection = conn;
checkExecuteConditions(state, conn);
statusLine = null;
connectionCloseForced = false;
conn.setLastResponseInputStream(null);
if(effectiveVersion == null)
effectiveVersion = params.getVersion();
//请求信息发送,包括些请求头/请求体等
writeRequest(state, conn);
requestSent = true;
readResponse(state, conn);
used = true;
return statusLine.getStatusCode();
}
protected void writeRequest(HttpState state, HttpConnection conn)
throws IOException, HttpException
{
int readTimeout;
LOG.trace("enter HttpMethodBase.writeRequest(HttpState, HttpConnection)");
//以下则根据 http 超文本协议,进行协议内容发送,包括请求头/请求体
writeRequestLine(state, conn);
writeRequestHeaders(state, conn);
conn.writeLine();
if(Wire.HEADER_WIRE.enabled())
Wire.HEADER_WIRE.output("\r\n");
HttpVersion ver = getParams().getVersion();
Header expectheader = getRequestHeader("Expect");
String expectvalue = null;
if(expectheader != null)
expectvalue = expectheader.getValue();
if(expectvalue == null || expectvalue.compareToIgnoreCase("100-continue") != 0)
break MISSING_BLOCK_LABEL_265;
if(!ver.greaterEquals(HttpVersion.HTTP_1_1))
break MISSING_BLOCK_LABEL_247;
conn.flushRequestOutputStream();
readTimeout = conn.getParams().getSoTimeout();
conn.setSocketTimeout(3000);
readStatusLine(state, conn);
processStatusLine(state, conn);
readResponseHeaders(state, conn);
processResponseHeaders(state, conn);
if(statusLine.getStatusCode() == 100)
{
statusLine = null;
LOG.debug("OK to continue received");
break MISSING_BLOCK_LABEL_184;
}
conn.setSocketTimeout(readTimeout);
return;
conn.setSocketTimeout(readTimeout);
break MISSING_BLOCK_LABEL_265;
InterruptedIOException e;
e;
if(!ExceptionUtil.isSocketTimeoutException(e))
throw e;
removeRequestHeader("Expect");
LOG.info("100 (continue) read timeout. Resume sending the request");
conn.setSocketTimeout(readTimeout);
break MISSING_BLOCK_LABEL_265;
Exception exception;
exception;
conn.setSocketTimeout(readTimeout);
throw exception;
removeRequestHeader("Expect");
LOG.info("'Expect: 100-continue' handshake is only supported by HTTP/1.1 or higher");
writeRequestBody(state, conn);
conn.flushRequestOutputStream();
return;
}
protected void readResponse(HttpState state, HttpConnection conn)
throws IOException, HttpException
{
LOG.trace("enter HttpMethodBase.readResponse(HttpState, HttpConnection)");
do
{
if(statusLine != null)
break;
//读取服务器返回内容,主要为 http 响应信息
readStatusLine(state, conn);
processStatusLine(state, conn);
readResponseHeaders(state, conn);
processResponseHeaders(state, conn);
int status = statusLine.getStatusCode();
if(status >= 100 && status < 200)
{
if(LOG.isInfoEnabled())
LOG.info("Discarding unexpected response: " + statusLine.toString());
statusLine = null;
}
} while(true);
readResponseBody(state, conn);
processResponseBody(state, conn);
}
来到这里,我们再回过头来看看,在 org.apache.commons.httpclient.HttpMethodDirector 处理过程中,是如何获取到与服务器间的连接,见org.apache.commons.httpclient.MultiThreadedHttpConnectionManager
public HttpConnection getConnectionWithTimeout(HostConfiguration hostConfiguration, long timeout)
throws ConnectionPoolTimeoutException
{
LOG.trace("enter HttpConnectionManager.getConnectionWithTimeout(HostConfiguration, long)");
if(hostConfiguration == null)
throw new IllegalArgumentException("hostConfiguration is null");
if(LOG.isDebugEnabled())
LOG.debug("HttpConnectionManager.getConnection:config = " + hostConfiguration + ", timeout = " + timeout);
HttpConnection conn = doGetConnection(hostConfiguration, timeout);
return new HttpConnectionAdapter(conn);
}
private HttpConnection doGetConnection(HostConfiguration hostConfiguration, long timeout)
throws ConnectionPoolTimeoutException
{
HttpConnection connection;
int maxHostConnections;
connection = null;
maxHostConnections = params.getMaxConnectionsPerHost(hostConfiguration);
int maxTotalConnections = params.getMaxTotalConnections();
ConnectionPool connectionpool = connectionPool;
JVM INSTR monitorenter ;
HostConnectionPool hostPool;
WaitingThread waitingThread;
boolean useTimeout;
long timeToWait;
long startWait;
hostConfiguration = new HostConfiguration(hostConfiguration);
//这里是根据服务端ip,从数据池 connectionPool 里面提取对应的 连接池.
//因此这里涉及到两个池:一个是 connectionPool,可以存储多服务器; 另一个是 hostPool,存储了一台服务器所有的连接.
hostPool = connectionPool.getHostPool(hostConfiguration, true);
waitingThread = null;
useTimeout = timeout > 0L;
timeToWait = timeout;
startWait = 0L;
long endWait = 0L;
_L2:
if(connection != null)
break; /* Loop/switch isn't completed */
if(shutdown)
throw new IllegalStateException("Connection factory has been shutdown.");
//下面是根据 hostPool 里面的连接数情况,进行连接的获取
if(hostPool.freeConnections.size() > 0)
{
connection = connectionPool.getFreeConnection(hostConfiguration);
continue; /* Loop/switch isn't completed */
}
if(hostPool.numConnections < maxHostConnections && connectionPool.numConnections < maxTotalConnections)
{
connection = connectionPool.createConnection(hostConfiguration);
continue; /* Loop/switch isn't completed */
}
if(hostPool.numConnections < maxHostConnections && connectionPool.freeConnections.size() > 0)
{
connectionPool.deleteLeastUsedConnection();
connection = connectionPool.createConnection(hostConfiguration);
continue; /* Loop/switch isn't completed */
}
if(useTimeout && timeToWait <= 0L)
throw new ConnectionPoolTimeoutException("Timeout waiting for connection");
if(LOG.isDebugEnabled())
LOG.debug("Unable to get a connection, waiting..., hostConfig=" + hostConfiguration);
if(waitingThread == null)
{
waitingThread = new WaitingThread();
waitingThread.hostConnectionPool = hostPool;
waitingThread.thread = Thread.currentThread();
} else
{
waitingThread.interruptedByConnectionPool = false;
}
if(useTimeout)
startWait = System.currentTimeMillis();
hostPool.waitingThreads.addLast(waitingThread);
connectionPool.waitingThreads.addLast(waitingThread);
connectionPool.wait(timeToWait);
if(!waitingThread.interruptedByConnectionPool)
{
hostPool.waitingThreads.remove(waitingThread);
connectionPool.waitingThreads.remove(waitingThread);
}
if(useTimeout)
{
long endWait = System.currentTimeMillis();
timeToWait -= endWait - startWait;
}
continue; /* Loop/switch isn't completed */
InterruptedException e;
e;
if(!waitingThread.interruptedByConnectionPool)
{
LOG.debug("Interrupted while waiting for connection", e);
throw new IllegalThreadStateException("Interrupted while waiting in MultiThreadedHttpConnectionManager");
}
if(!waitingThread.interruptedByConnectionPool)
{
hostPool.waitingThreads.remove(waitingThread);
connectionPool.waitingThreads.remove(waitingThread);
}
if(useTimeout)
{
long endWait = System.currentTimeMillis();
timeToWait -= endWait - startWait;
}
if(true) goto _L2; else goto _L1
_L1:
Exception exception;
break MISSING_BLOCK_LABEL_555;
exception;
if(!waitingThread.interruptedByConnectionPool)
{
hostPool.waitingThreads.remove(waitingThread);
connectionPool.waitingThreads.remove(waitingThread);
}
if(useTimeout)
{
long endWait = System.currentTimeMillis();
timeToWait -= endWait - startWait;
}
throw exception;
Exception exception1;
exception1;
throw exception1;
return connection;
}
public synchronized HttpConnection createConnection(HostConfiguration hostConfiguration)
{
HostConnectionPool hostPool = getHostPool(hostConfiguration, true);
if(MultiThreadedHttpConnectionManager.LOG.isDebugEnabled())
MultiThreadedHttpConnectionManager.LOG.debug("Allocating new connection, hostConfig=" + hostConfiguration);
//创建连接,这里的继承了 httpconnection ,根据 ip port 信息,与服务器建立 socket 连接
HttpConnectionWithReference connection = new HttpConnectionWithReference(hostConfiguration);
connection.getParams().setDefaults(params);
connection.setHttpConnectionManager(MultiThreadedHttpConnectionManager.this);
numConnections++;
hostPool.numConnections++;
MultiThreadedHttpConnectionManager.storeReferenceToConnection(connection, hostConfiguration, this);
return connection;
}
public HttpConnection(String proxyHost, int proxyPort, String host, int port, Protocol protocol)
{
hostName = null;
portNumber = -1;
proxyHostName = null;
proxyPortNumber = -1;
socket = null;
inputStream = null;
outputStream = null;
lastResponseInputStream = null;
isOpen = false;
params = new HttpConnectionParams();
locked = false;
usingSecureSocket = false;
tunnelEstablished = false;
if(host == null)
throw new IllegalArgumentException("host parameter is null");
if(protocol == null)
{
throw new IllegalArgumentException("protocol is null");
} else
{
proxyHostName = proxyHost;
proxyPortNumber = proxyPort;
hostName = host;
portNumber = protocol.resolvePort(port);
protocolInUse = protocol;
return;
}
}
public void open()
throws IOException
{
LOG.trace("enter HttpConnection.open()");
String host = proxyHostName != null ? proxyHostName : hostName;
int port = proxyHostName != null ? proxyPortNumber : portNumber;
assertNotOpen();
if(LOG.isDebugEnabled())
LOG.debug("Open connection to " + host + ":" + port);
try
{
if(socket == null)
{
usingSecureSocket = isSecure() && !isProxied();
ProtocolSocketFactory socketFactory = null;
if(isSecure() && isProxied())
{
Protocol defaultprotocol = Protocol.getProtocol("http");
socketFactory = defaultprotocol.getSocketFactory();
} else
{
socketFactory = protocolInUse.getSocketFactory();
}
//与服务器间建立起socket通道,进行后续的数据交互
socket = socketFactory.createSocket(host, port, localAddress, 0, params);
}
socket.setTcpNoDelay(params.getTcpNoDelay());
socket.setSoTimeout(params.getSoTimeout());
int linger = params.getLinger();
if(linger >= 0)
socket.setSoLinger(linger > 0, linger);
int sndBufSize = params.getSendBufferSize();
if(sndBufSize >= 0)
socket.setSendBufferSize(sndBufSize);
int rcvBufSize = params.getReceiveBufferSize();
if(rcvBufSize >= 0)
socket.setReceiveBufferSize(rcvBufSize);
int outbuffersize = socket.getSendBufferSize();
if(outbuffersize > 2048 || outbuffersize <= 0)
outbuffersize = 2048;
int inbuffersize = socket.getReceiveBufferSize();
if(inbuffersize > 2048 || inbuffersize <= 0)
inbuffersize = 2048;
inputStream = new BufferedInputStream(socket.getInputStream(), inbuffersize);
outputStream = new BufferedOutputStream(socket.getOutputStream(), outbuffersize);
isOpen = true;
}
catch(IOException e)
{
closeSocketAndStreams();
throw e;
}
}
页:
[1]