|
1. 背景
Android推送服务应用非常广泛,一般有轮询、SMS推送、IP推送几种实现方式。由于轮询的即时性不高、SMS推送需要不菲的费用,所以一般采取IP推送。由于google的IP推送平台C2DM国内被屏蔽,国内涌现很多优秀的推送平台如个推、极光推送。由于实现推送服务有一定技术难度,很多移动互联网应用直接租用这些平台服务,达到快速拓展业务的目标。
但是在一些企业和行业应用场合,限制手机接入互联网,不能采用互联网推送平台,就必须实现自己的推送服务。国内流行的方案是采用开源的androidpn,基于XMPP协议。由于XMPP是一个用于IM即时消息的协议,用来做推送服务有很多冗余的东西,显得很不优雅。因此本文提出另一种简洁思路。
2. 采用HTTP的可能性
从服务器向手机推送存在两个限制:1)移动设备IP经常改变,Server端无法通过配置解决;2)设备常在NAT后面(如无线路由器),无法由Server发起建立socket连接。因此必须由手机端发起向服务器发起socket连接,并保持这个连接,服务在这个连接上向手机推送消息。
因此推送服务存在这样的场景:在socket建连时,手机是client端,推送服务器是server端。但是在消息推送时,推送服务器是发起端成了client,手机作为接收端变成了Server。一般的HTTP开源实现,如apache http core,socket server端绑定http server端,socket client端绑定HTTP client端,无法拆分。但是幸运的是,apache http core的NIO版本,却实现可socket层和http层的分离,可以在socket server上发起http client请求。推送服务器因为要保持成千上万的持久连接,NIO成了不二的选择,apache httpcore-nio的NIO模型是理想选择。
3. Demo描述
在公司项目中,服务器端采用apache httpcore-nio,手机端则采用普通的http协议栈,以简化手机的开发难度。因为基于简单协议HTTP、及成熟开源代码,开发量非常小,千把行代码就实现了一个简洁够用的推送服务器,目前该推送服务器已经用于生产环境一年多,性能和稳定性都表现相当不错。
在demo中,手机和服务器端统一采用apache httpcore-nio,基于HTTP通信。通信模型如下图所示:两个手机、一台服务器都运行于localhost,采用不同的端口区分。
Java工程如下图所示。
采用apache httpcore-nio-4.2.4版本。ReverseNHttpServer.java实现推送客户端功能。ReverseNHttpClient.java和ConnectionManager.java实现推送服务器功能,其中ConnectionManager用一个Queue保存已经建立的连接。ReverseHttpTest.java用于测试。
3.1. ReverseNHttpClient
下面是源码。
package com.cangfu.reversehttp;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.http.ContentTooLongException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.entity.ContentBufferEntity;
import org.apache.http.nio.protocol.AbstractAsyncResponseConsumer;
import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
import org.apache.http.nio.protocol.HttpAsyncRequester;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.ListenerEndpoint;
import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.nio.util.SimpleInputBuffer;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
public class ReverseNHttpClient {
ListeningIOReactor ioReactor;
HttpParams params;
Thread t;
static ConnectionManager connMgr = new ConnectionManager();
public static ConnectionManager getConnectionManager() {
return connMgr;
}
public ReverseNHttpClient(IOReactorConfig ioconfig) throws IOReactorException {
this.params = new SyncBasicHttpParams();
this.params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000)
.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setParameter(CoreProtocolPNames.USER_AGENT, "HttpTest/1.1")
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1");
// Create client-side I/O reactor
ioReactor = new DefaultListeningIOReactor(ioconfig);
}
public void start() {
HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor() {
@Override
public void connected(
final NHttpClientConnection conn,
final Object attachment) throws IOException, HttpException {
try {
ReverseNHttpClient.getConnectionManager().putConnection(conn);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(conn + ": connection open in ReverseClient side");
super.connected(conn, attachment);
}
};
// Create client-side I/O event dispatch
final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, params);
// Run the I/O reactor in a separate thread
t = new Thread(new Runnable() {
public void run() {
try {
// Ready to go!
ioReactor.execute(ioEventDispatch);
} catch (IOException e) {
System.err.println("I/O error: " + e.getMessage());
}
System.out.println("Shutdown");
}
});
// Start the client thread
t.start();
}
public void stop() {
if (t != null)
t.interrupt();
}
public void accept(int port) throws InterruptedException {
ListenerEndpoint request = ioReactor.listen(new InetSocketAddress(port));
request.waitFor();
}
public void HttpExchange(HttpHost target, BasicHttpRequest request) throws InterruptedException {
HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
// Use standard client-side protocol interceptors
new RequestContent(),
new RequestTargetHost(),
new RequestConnControl(),
new RequestUserAgent(),
new RequestExpectContinue()});
HttpAsyncRequester requester = new HttpAsyncRequester(
httpproc, new DefaultConnectionReuseStrategy(), params);
requester.execute(
new BasicAsyncRequestProducer(target, request),
new MyResponseConsumer(),
ReverseNHttpClient.getConnectionManager().takeConnection()
);
}
static class MyResponseConsumer extends AbstractAsyncResponseConsumer {
private volatile HttpResponse response;
private volatile SimpleInputBuffer buf;
public MyResponseConsumer() {
super();
}
@Override
protected void onResponseReceived(final HttpResponse response) throws IOException {
this.response = response;
}
@Override
protected void onEntityEnclosed(
final HttpEntity entity, final ContentType contentType) throws IOException {
long len = entity.getContentLength();
if (len > Integer.MAX_VALUE) {
throw new ContentTooLongException("Entity content is too long: " + len);
}
if (len < 0) {
len = 4096;
}
this.buf = new SimpleInputBuffer((int) len, new HeapByteBufferAllocator());
this.response.setEntity(new ContentBufferEntity(entity, this.buf));
}
@Override
protected void onContentReceived(
final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
if (this.buf == null) {
throw new IllegalStateException("Content buffer is null");
}
this.buf.consumeContent(decoder);
}
@Override
protected void releaseResources() {
this.response = null;
this.buf = null;
}
@Override
protected HttpResponse buildResult(final HttpContext context) throws IOException {
System.out.println();
System.out.println(((BasicHttpResponse)response).toString());
response.getEntity().writeTo(System.out);
System.out.println();
return this.response;
}
}
} 其中,start()方法用于启动httpclient,accept()用于启动socket监听,httpExchange用于发送http请求,MyResponseconsumer是一个内部类,用于处理返回的响应。
3.2. ReverseNHttpServer
下面是源码。
package com.cangfu.reversehttp;
import java.io.File;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URL;
import java.net.URLDecoder;
import java.security.KeyStore;
import java.util.Locale;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpStatus;
import org.apache.http.MethodNotSupportedException;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultHttpServerIODispatch;
import org.apache.http.impl.nio.DefaultNHttpServerConnection;
import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;
import org.apache.http.impl.nio.SSLNHttpServerConnectionFactory;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.NHttpConnectionFactory;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.nio.entity.NFileEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
import org.apache.http.nio.protocol.HttpAsyncExchange;
import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;
import org.apache.http.nio.protocol.HttpAsyncService;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.nio.reactor.SessionRequestCallback;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
public class ReverseNHttpServer {
HttpParams params;
final ConnectingIOReactor ioReactor;
Thread t;
static ConnectionManager connMgr = new ConnectionManager();
public static ConnectionManager getConnectionManager() {
return connMgr;
}
public ReverseNHttpServer(IOReactorConfig ioconfig) throws IOReactorException {
this.params = new SyncBasicHttpParams();
this.params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000)
.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setParameter(CoreProtocolPNames.USER_AGENT, "HttpTest/1.1")
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1");
ioReactor = new DefaultConnectingIOReactor(ioconfig);
}
public void start(String path, boolean ssl) throws Exception {
// Create HTTP protocol processing chain
HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
// Use standard server-side protocol interceptors
new ResponseDate(),
new ResponseServer(),
new ResponseContent(),
new ResponseConnControl()
});
// Create request handler registry
HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry();
// Register the default handler for all URIs
registry.register("*", new HttpFileHandler(new File(path)));
// Create server-side HTTP protocol handler
HttpAsyncService protocolHandler = new HttpAsyncService(
httpproc, new DefaultConnectionReuseStrategy(), registry, params) {
@Override
public void connected(final NHttpServerConnection conn) {
System.out.println(conn + ": connection open in ReverseServer side");
try {
ReverseNHttpServer.getConnectionManager().putConnection(conn);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
super.connected(conn);
}
@Override
public void closed(final NHttpServerConnection conn) {
System.out.println(conn + ": connection closed");
super.closed(conn);
}
};
// Create HTTP connection factory
NHttpConnectionFactory connFactory;
if (ssl) {
// Initialize SSL context
ClassLoader cl = ReverseNHttpServer.class.getClassLoader();
URL url = cl.getResource("my.keystore");
if (url == null) {
System.out.println("Keystore not found");
System.exit(1);
}
KeyStore keystore = KeyStore.getInstance("jks");
keystore.load(url.openStream(), "secret".toCharArray());
KeyManagerFactory kmfactory = KeyManagerFactory.getInstance(
KeyManagerFactory.getDefaultAlgorithm());
kmfactory.init(keystore, "secret".toCharArray());
KeyManager[] keymanagers = kmfactory.getKeyManagers();
SSLContext sslcontext = SSLContext.getInstance("TLS");
sslcontext.init(keymanagers, null, null);
connFactory = new SSLNHttpServerConnectionFactory(sslcontext, null, params);
} else {
connFactory = new DefaultNHttpServerConnectionFactory(params);
}
// Create server-side I/O event dispatch
final IOEventDispatch ioEventDispatch = new DefaultHttpServerIODispatch(protocolHandler, connFactory);
// Run the I/O reactor in a separate thread
t = new Thread(new Runnable() {
public void run() {
try {
// Ready to go!
ioReactor.execute(ioEventDispatch);
} catch (IOException e) {
System.err.println("I/O error: " + e.getMessage());
}
System.out.println("Shutdown");
}
});
// Start the client thread
t.start();
}
public void stop() {
if (t != null) {
t.interrupt();
}
}
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress) throws InterruptedException {
SessionRequest sessionReq = ioReactor.connect(remoteAddress, localAddress, null,
new SessionRequestCallback() {
@Override
public void completed(SessionRequest request) {
System.out.println("connection established succeed in device side!!!");
}
@Override
public void failed(SessionRequest request) {
System.err.println("Device side connection failed!");
System.err.println(request.getException().getMessage());
}
@Override
public void timeout(SessionRequest request) {
System.err.println("Device side connection timeout!");
System.err.println(request.getException().getMessage());
}
@Override
public void cancelled(SessionRequest request) {
System.err.println("Device side connection cancelled!");
System.err.println(request.getException().getMessage());
}
}
);
sessionReq.waitFor();
}
static class HttpFileHandler implements HttpAsyncRequestHandler {
private final File docRoot;
public HttpFileHandler() {
docRoot = new File(".");
}
public HttpFileHandler(final File docRoot) {
super();
this.docRoot = docRoot;
}
public HttpAsyncRequestConsumer proce***equest(
final HttpRequest request,
final HttpContext context) {
// Buffer request content in memory for simplicity
return new BasicAsyncRequestConsumer();
}
public void handle(
final HttpRequest request,
final HttpAsyncExchange httpexchange,
final HttpContext context) throws HttpException, IOException {
HttpResponse response = httpexchange.getResponse();
handleInternal(request, response, context);
httpexchange.submitResponse(new BasicAsyncResponseProducer(response));
}
private void handleInternal(
final HttpRequest request,
final HttpResponse response,
final HttpContext context) throws HttpException, IOException {
String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
throw new MethodNotSupportedException(method + " method not supported");
}
String target = request.getRequestLine().getUri();
final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8"));
if (!file.exists()) {
response.setStatusCode(HttpStatus.SC_NOT_FOUND);
NStringEntity entity = new NStringEntity(
"File" + file.getPath() +
" not found",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("File " + file.getPath() + " not found");
} else if (!file.canRead() || file.isDirectory()) {
response.setStatusCode(HttpStatus.SC_FORBIDDEN);
NStringEntity entity = new NStringEntity(
"Access denied",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("Cannot read file " + file.getPath());
} else {
NHttpConnection conn = (NHttpConnection) context.getAttribute(
ExecutionContext.HTTP_CONNECTION);
response.setStatusCode(HttpStatus.SC_OK);
NFileEntity body = new NFileEntity(file, ContentType.create("text/html"));
response.setEntity(body);
System.out.println(conn + ": serving file " + file.getPath());
}
}
}
} 其中,start()方法用于启动httpserver,connect()用于发起socket连接。HttpFileHandler()是一个处理http请求的内部钩子类,它从本地读取一个文本文件,通过response发送到服务器。
3.3. ConnectionManager
源码。
package com.cangfu.reversehttp;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ConnectionManager {
private final BlockingQueue connections;
public ConnectionManager() {
connections = new LinkedBlockingQueue();
}
public void putConnection(E conn) throws InterruptedException {
this.connections.put(conn);
}
public E takeConnection() throws InterruptedException {
return this.connections.take();
}
} 3.4. ReverseHttpTest
源码如下。
package com.cangfu.reversehttp;
import java.net.InetSocketAddress;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHttpRequest;
import org.junit.Test;
public class ReverseHttpTest {
@Test
public void testReverseHttp() throws Exception {
int sockServerPort = 60010;
int sockClientPort1 = 60011;
int sockClientPort2 = 60012;
IOReactorConfig ioconfig = new IOReactorConfig();
ioconfig.setIoThreadCount(1);
ioconfig.setSoKeepalive(true);
ioconfig.setSoReuseAddress(true);
ioconfig.setSelectInterval(1000);
// Start ReverseClient, at APNS Server side
ReverseNHttpClient reverseClient = new ReverseNHttpClient(ioconfig);
reverseClient.start();
reverseClient.accept(sockServerPort);
// Start ReverseServer, at Device side
ReverseNHttpServer reverseServer1 = new ReverseNHttpServer(ioconfig);
reverseServer1.start(".", false);
reverseServer1.connect(new InetSocketAddress("localhost", sockServerPort), new InetSocketAddress("localhost", sockClientPort1));
// Start another ReverseServer, at Device side
ReverseNHttpServer reverseServer2 = new ReverseNHttpServer(ioconfig);
reverseServer2.start(".", false);
reverseServer2.connect(new InetSocketAddress("localhost", sockServerPort), new InetSocketAddress("localhost", sockClientPort2));
// send a request from APNS server to phone 1
HttpHost target1 = new HttpHost("localhost", sockClientPort1, "http");
BasicHttpRequest request1 = new BasicHttpRequest("GET", "/Hello1.txt");
reverseClient.HttpExchange(target1, request1);
// send another request from APNS server to phone 2
HttpHost target2 = new HttpHost("localhost", sockClientPort2, "http");
BasicHttpRequest request2 = new BasicHttpRequest("GET", "/Hello2.txt");
reverseClient.HttpExchange(target2, request2);
Thread.sleep(2*1000);
reverseServer1.stop();
reverseServer2.stop();
reverseClient.stop();
}
}
启动一个推送服务器的HttpClient和两个手机端的HttpServer。推送服务器分别向两个手机发送GET请求。手机读取本地的hello.txt文件,放置于http response的消息体中,返回给服务器。下面是测试运行结果:
|
|