|
1. 背景Android推送服务应用非常广泛,一般有轮询、SMS推送、IP推送几种实现方式。由于轮询的即时性不高、SMS推送需要不菲的费用,所以一般采取IP推送。由于google的IP推送平台C2DM国内被屏蔽,国内涌现很多优秀的推送平台如个推、极光推送。由于实现推送服务有一定技术难度,很多移动互联网应用直接租用这些平台服务,达到快速拓展业务的目标。
但是在一些企业和行业应用场合,限制手机接入互联网,不能采用互联网推送平台,就必须实现自己的推送服务。国内流行的方案是采用开源的androidpn,基于XMPP协议。由于XMPP是一个用于IM即时消息的协议,用来做推送服务有很多冗余的东西,显得很不优雅。因此本文提出另一种简洁思路。
2. 采用HTTP的可能性从服务器向手机推送存在两个限制:1)移动设备IP经常改变,Server端无法通过配置解决;2)设备常在NAT后面(如无线路由器),无法由Server发起建立socket连接。因此必须由手机端发起向服务器发起socket连接,并保持这个连接,服务在这个连接上向手机推送消息。
因此推送服务存在这样的场景:在socket建连时,手机是client端,推送服务器是server端。但是在消息推送时,推送服务器是发起端成了client,手机作为接收端变成了Server。一般的HTTP开源实现,如apache http core,socket server端绑定http server端,socket client端绑定HTTP client端,无法拆分。但是幸运的是,apache http core的NIO版本,却实现可socket层和http层的分离,可以在socket server上发起http client请求。推送服务器因为要保持成千上万的持久连接,NIO成了不二的选择,apache httpcore-nio的NIO模型是理想选择。
3. Demo描述在公司项目中,服务器端采用apache httpcore-nio,手机端则采用普通的http协议栈,以简化手机的开发难度。因为基于简单协议HTTP、及成熟开源代码,开发量非常小,千把行代码就实现了一个简洁够用的推送服务器,目前该推送服务器已经用于生产环境一年多,性能和稳定性都表现相当不错。
在demo中,手机和服务器端统一采用apache httpcore-nio,基于HTTP通信。通信模型如下图所示:两个手机、一台服务器都运行于localhost,采用不同的端口区分。
Java工程如下图所示。 采用apache httpcore-nio-4.2.4版本。ReverseNHttpServer.java实现推送客户端功能。ReverseNHttpClient.java和ConnectionManager.java实现推送服务器功能,其中ConnectionManager用一个Queue保存已经建立的连接。ReverseHttpTest.java用于测试。 3.1. ReverseNHttpClient下面是源码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
| package com.cangfu.reversehttp;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.http.ContentTooLongException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.entity.ContentBufferEntity;
import org.apache.http.nio.protocol.AbstractAsyncResponseConsumer;
import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
import org.apache.http.nio.protocol.HttpAsyncRequester;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.ListenerEndpoint;
import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.nio.util.SimpleInputBuffer;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
public class ReverseNHttpClient {
ListeningIOReactor ioReactor;
HttpParams params;
Thread t;
static ConnectionManager connMgr = new ConnectionManager();
public static ConnectionManager getConnectionManager() {
return connMgr;
}
public ReverseNHttpClient(IOReactorConfig ioconfig) throws IOReactorException {
this.params = new SyncBasicHttpParams();
this.params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000)
.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setParameter(CoreProtocolPNames.USER_AGENT, "HttpTest/1.1")
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1");
// Create client-side I/O reactor
ioReactor = new DefaultListeningIOReactor(ioconfig);
}
public void start() {
HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor() {
@Override
public void connected(
final NHttpClientConnection conn,
final Object attachment) throws IOException, HttpException {
try {
ReverseNHttpClient.getConnectionManager().putConnection(conn);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(conn + ": connection open in ReverseClient side");
super.connected(conn, attachment);
}
};
// Create client-side I/O event dispatch
final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, params);
// Run the I/O reactor in a separate thread
t = new Thread(new Runnable() {
public void run() {
try {
// Ready to go!
ioReactor.execute(ioEventDispatch);
} catch (IOException e) {
System.err.println("I/O error: " + e.getMessage());
}
System.out.println("Shutdown");
}
});
// Start the client thread
t.start();
}
public void stop() {
if (t != null)
t.interrupt();
}
public void accept(int port) throws InterruptedException {
ListenerEndpoint request = ioReactor.listen(new InetSocketAddress(port));
request.waitFor();
}
public void HttpExchange(HttpHost target, BasicHttpRequest request) throws InterruptedException {
HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
// Use standard client-side protocol interceptors
new RequestContent(),
new RequestTargetHost(),
new RequestConnControl(),
new RequestUserAgent(),
new RequestExpectContinue()});
HttpAsyncRequester requester = new HttpAsyncRequester(
httpproc, new DefaultConnectionReuseStrategy(), params);
requester.execute(
new BasicAsyncRequestProducer(target, request),
new MyResponseConsumer(),
ReverseNHttpClient.getConnectionManager().takeConnection()
);
}
static class MyResponseConsumer extends AbstractAsyncResponseConsumer {
private volatile HttpResponse response;
private volatile SimpleInputBuffer buf;
public MyResponseConsumer() {
super();
}
@Override
protected void onResponseReceived(final HttpResponse response) throws IOException {
this.response = response;
}
@Override
protected void onEntityEnclosed(
final HttpEntity entity, final ContentType contentType) throws IOException {
long len = entity.getContentLength();
if (len > Integer.MAX_VALUE) {
throw new ContentTooLongException("Entity content is too long: " + len);
}
if (len < 0) {
len = 4096;
}
this.buf = new SimpleInputBuffer((int) len, new HeapByteBufferAllocator());
this.response.setEntity(new ContentBufferEntity(entity, this.buf));
}
@Override
protected void onContentReceived(
final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
if (this.buf == null) {
throw new IllegalStateException("Content buffer is null");
}
this.buf.consumeContent(decoder);
}
@Override
protected void releaseResources() {
this.response = null;
this.buf = null;
}
@Override
protected HttpResponse buildResult(final HttpContext context) throws IOException {
System.out.println();
System.out.println(((BasicHttpResponse)response).toString());
response.getEntity().writeTo(System.out);
System.out.println();
return this.response;
}
}
}
|
其中,start()方法用于启动httpclient,accept()用于启动socket监听,httpExchange用于发送http请求,MyResponseconsumer是一个内部类,用于处理返回的响应。
3.2. ReverseNHttpServer下面是源码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
| package com.cangfu.reversehttp;
import java.io.File;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URL;
import java.net.URLDecoder;
import java.security.KeyStore;
import java.util.Locale;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpStatus;
import org.apache.http.MethodNotSupportedException;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultHttpServerIODispatch;
import org.apache.http.impl.nio.DefaultNHttpServerConnection;
import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;
import org.apache.http.impl.nio.SSLNHttpServerConnectionFactory;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.NHttpConnectionFactory;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.nio.entity.NFileEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
import org.apache.http.nio.protocol.HttpAsyncExchange;
import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;
import org.apache.http.nio.protocol.HttpAsyncService;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.nio.reactor.SessionRequestCallback;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
public class ReverseNHttpServer {
HttpParams params;
final ConnectingIOReactor ioReactor;
Thread t;
static ConnectionManager connMgr = new ConnectionManager();
public static ConnectionManager getConnectionManager() {
return connMgr;
}
public ReverseNHttpServer(IOReactorConfig ioconfig) throws IOReactorException {
this.params = new SyncBasicHttpParams();
this.params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000)
.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setParameter(CoreProtocolPNames.USER_AGENT, "HttpTest/1.1")
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1");
ioReactor = new DefaultConnectingIOReactor(ioconfig);
}
public void start(String path, boolean ssl) throws Exception {
// Create HTTP protocol processing chain
HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
// Use standard server-side protocol interceptors
new ResponseDate(),
new ResponseServer(),
new ResponseContent(),
new ResponseConnControl()
});
// Create request handler registry
HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry();
// Register the default handler for all URIs
registry.register("*", new HttpFileHandler(new File(path)));
// Create server-side HTTP protocol handler
HttpAsyncService protocolHandler = new HttpAsyncService(
httpproc, new DefaultConnectionReuseStrategy(), registry, params) {
@Override
public void connected(final NHttpServerConnection conn) {
System.out.println(conn + ": connection open in ReverseServer side");
try {
ReverseNHttpServer.getConnectionManager().putConnection(conn);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
super.connected(conn);
}
@Override
public void closed(final NHttpServerConnection conn) {
System.out.println(conn + ": connection closed");
super.closed(conn);
}
};
// Create HTTP connection factory
NHttpConnectionFactory connFactory;
if (ssl) {
// Initialize SSL context
ClassLoader cl = ReverseNHttpServer.class.getClassLoader();
URL url = cl.getResource("my.keystore");
if (url == null) {
System.out.println("Keystore not found");
System.exit(1);
}
KeyStore keystore = KeyStore.getInstance("jks");
keystore.load(url.openStream(), "secret".toCharArray());
KeyManagerFactory kmfactory = KeyManagerFactory.getInstance(
KeyManagerFactory.getDefaultAlgorithm());
kmfactory.init(keystore, "secret".toCharArray());
KeyManager[] keymanagers = kmfactory.getKeyManagers();
SSLContext sslcontext = SSLContext.getInstance("TLS");
sslcontext.init(keymanagers, null, null);
connFactory = new SSLNHttpServerConnectionFactory(sslcontext, null, params);
} else {
connFactory = new DefaultNHttpServerConnectionFactory(params);
}
// Create server-side I/O event dispatch
final IOEventDispatch ioEventDispatch = new DefaultHttpServerIODispatch(protocolHandler, connFactory);
// Run the I/O reactor in a separate thread
t = new Thread(new Runnable() {
public void run() {
try {
// Ready to go!
ioReactor.execute(ioEventDispatch);
} catch (IOException e) {
System.err.println("I/O error: " + e.getMessage());
}
System.out.println("Shutdown");
}
});
// Start the client thread
t.start();
}
public void stop() {
if (t != null) {
t.interrupt();
}
}
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress) throws InterruptedException {
SessionRequest sessionReq = ioReactor.connect(remoteAddress, localAddress, null,
new SessionRequestCallback() {
@Override
public void completed(SessionRequest request) {
System.out.println("connection established succeed in device side!!!");
}
@Override
public void failed(SessionRequest request) {
System.err.println("Device side connection failed!");
System.err.println(request.getException().getMessage());
}
@Override
public void timeout(SessionRequest request) {
System.err.println("Device side connection timeout!");
System.err.println(request.getException().getMessage());
}
@Override
public void cancelled(SessionRequest request) {
System.err.println("Device side connection cancelled!");
System.err.println(request.getException().getMessage());
}
}
);
sessionReq.waitFor();
}
static class HttpFileHandler implements HttpAsyncRequestHandler {
private final File docRoot;
public HttpFileHandler() {
docRoot = new File(".");
}
public HttpFileHandler(final File docRoot) {
super();
this.docRoot = docRoot;
}
public HttpAsyncRequestConsumer processRequest(
final HttpRequest request,
final HttpContext context) {
// Buffer request content in memory for simplicity
return new BasicAsyncRequestConsumer();
}
public void handle(
final HttpRequest request,
final HttpAsyncExchange httpexchange,
final HttpContext context) throws HttpException, IOException {
HttpResponse response = httpexchange.getResponse();
handleInternal(request, response, context);
httpexchange.submitResponse(new BasicAsyncResponseProducer(response));
}
private void handleInternal(
final HttpRequest request,
final HttpResponse response,
final HttpContext context) throws HttpException, IOException {
String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
throw new MethodNotSupportedException(method + " method not supported");
}
String target = request.getRequestLine().getUri();
final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8"));
if (!file.exists()) {
response.setStatusCode(HttpStatus.SC_NOT_FOUND);
NStringEntity entity = new NStringEntity(
"File" + file.getPath() +
" not found",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("File " + file.getPath() + " not found");
} else if (!file.canRead() || file.isDirectory()) {
response.setStatusCode(HttpStatus.SC_FORBIDDEN);
NStringEntity entity = new NStringEntity(
"Access denied",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("Cannot read file " + file.getPath());
} else {
NHttpConnection conn = (NHttpConnection) context.getAttribute(
ExecutionContext.HTTP_CONNECTION);
response.setStatusCode(HttpStatus.SC_OK);
NFileEntity body = new NFileEntity(file, ContentType.create("text/html"));
response.setEntity(body);
System.out.println(conn + ": serving file " + file.getPath());
}
}
}
}
|
其中,start()方法用于启动httpserver,connect()用于发起socket连接。HttpFileHandler()是一个处理http请求的内部钩子类,它从本地读取一个文本文件,通过response发送到服务器。
3.3. ConnectionManager
源码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| package com.cangfu.reversehttp;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ConnectionManager {
private final BlockingQueue connections;
public ConnectionManager() {
connections = new LinkedBlockingQueue();
}
public void putConnection(E conn) throws InterruptedException {
this.connections.put(conn);
}
public E takeConnection() throws InterruptedException {
return this.connections.take();
}
}
|
3.4. ReverseHttpTest
源码如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| package com.cangfu.reversehttp;
import java.net.InetSocketAddress;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHttpRequest;
import org.junit.Test;
public class ReverseHttpTest {
@Test
public void testReverseHttp() throws Exception {
int sockServerPort = 60010;
int sockClientPort1 = 60011;
int sockClientPort2 = 60012;
IOReactorConfig ioconfig = new IOReactorConfig();
ioconfig.setIoThreadCount(1);
ioconfig.setSoKeepalive(true);
ioconfig.setSoReuseAddress(true);
ioconfig.setSelectInterval(1000);
// Start ReverseClient, at APNS Server side
ReverseNHttpClient reverseClient = new ReverseNHttpClient(ioconfig);
reverseClient.start();
reverseClient.accept(sockServerPort);
// Start ReverseServer, at Device side
ReverseNHttpServer reverseServer1 = new ReverseNHttpServer(ioconfig);
reverseServer1.start(".", false);
reverseServer1.connect(new InetSocketAddress("localhost", sockServerPort), new InetSocketAddress("localhost", sockClientPort1));
// Start another ReverseServer, at Device side
ReverseNHttpServer reverseServer2 = new ReverseNHttpServer(ioconfig);
reverseServer2.start(".", false);
reverseServer2.connect(new InetSocketAddress("localhost", sockServerPort), new InetSocketAddress("localhost", sockClientPort2));
// send a request from APNS server to phone 1
HttpHost target1 = new HttpHost("localhost", sockClientPort1, "http");
BasicHttpRequest request1 = new BasicHttpRequest("GET", "/Hello1.txt");
reverseClient.HttpExchange(target1, request1);
// send another request from APNS server to phone 2
HttpHost target2 = new HttpHost("localhost", sockClientPort2, "http");
BasicHttpRequest request2 = new BasicHttpRequest("GET", "/Hello2.txt");
reverseClient.HttpExchange(target2, request2);
Thread.sleep(2*1000);
reverseServer1.stop();
reverseServer2.stop();
reverseClient.stop();
}
}
|
启动一个推送服务器的HttpClient和两个手机端的HttpServer。推送服务器分别向两个手机发送GET请求。手机读取本地的hello.txt文件,放置于http response的消息体中,返回给服务器。下面是测试运行结果:
|
|