191145686 发表于 2018-11-22 07:38:22

巧用apache httpcore

1. 背景
  Android推送服务应用非常广泛,一般有轮询、SMS推送、IP推送几种实现方式。由于轮询的即时性不高、SMS推送需要不菲的费用,所以一般采取IP推送。由于google的IP推送平台C2DM国内被屏蔽,国内涌现很多优秀的推送平台如个推、极光推送。由于实现推送服务有一定技术难度,很多移动互联网应用直接租用这些平台服务,达到快速拓展业务的目标。
  
  但是在一些企业和行业应用场合,限制手机接入互联网,不能采用互联网推送平台,就必须实现自己的推送服务。国内流行的方案是采用开源的androidpn,基于XMPP协议。由于XMPP是一个用于IM即时消息的协议,用来做推送服务有很多冗余的东西,显得很不优雅。因此本文提出另一种简洁思路。
2. 采用HTTP的可能性
  从服务器向手机推送存在两个限制:1)移动设备IP经常改变,Server端无法通过配置解决;2)设备常在NAT后面(如无线路由器),无法由Server发起建立socket连接。因此必须由手机端发起向服务器发起socket连接,并保持这个连接,服务在这个连接上向手机推送消息。
  
  因此推送服务存在这样的场景:在socket建连时,手机是client端,推送服务器是server端。但是在消息推送时,推送服务器是发起端成了client,手机作为接收端变成了Server。一般的HTTP开源实现,如apache http core,socket server端绑定http server端,socket client端绑定HTTP client端,无法拆分。但是幸运的是,apache http core的NIO版本,却实现可socket层和http层的分离,可以在socket server上发起http client请求。推送服务器因为要保持成千上万的持久连接,NIO成了不二的选择,apache httpcore-nio的NIO模型是理想选择。
3. Demo描述
  在公司项目中,服务器端采用apache httpcore-nio,手机端则采用普通的http协议栈,以简化手机的开发难度。因为基于简单协议HTTP、及成熟开源代码,开发量非常小,千把行代码就实现了一个简洁够用的推送服务器,目前该推送服务器已经用于生产环境一年多,性能和稳定性都表现相当不错。
  
  在demo中,手机和服务器端统一采用apache httpcore-nio,基于HTTP通信。通信模型如下图所示:两个手机、一台服务器都运行于localhost,采用不同的端口区分。

Java工程如下图所示。


采用apache httpcore-nio-4.2.4版本。ReverseNHttpServer.java实现推送客户端功能。ReverseNHttpClient.java和ConnectionManager.java实现推送服务器功能,其中ConnectionManager用一个Queue保存已经建立的连接。ReverseHttpTest.java用于测试。
3.1. ReverseNHttpClient
  下面是源码。
  
package com.cangfu.reversehttp;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.http.ContentTooLongException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.entity.ContentBufferEntity;
import org.apache.http.nio.protocol.AbstractAsyncResponseConsumer;
import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
import org.apache.http.nio.protocol.HttpAsyncRequester;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.ListenerEndpoint;
import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.nio.util.SimpleInputBuffer;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
public class ReverseNHttpClient {
ListeningIOReactor ioReactor;
    HttpParams params;
    Thread t;
    static ConnectionManager connMgr = new ConnectionManager();
    public static ConnectionManager getConnectionManager() {
    return connMgr;
    }
    public ReverseNHttpClient(IOReactorConfig ioconfig) throws IOReactorException {
this.params = new SyncBasicHttpParams();
      this.params
            .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000)
            .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000)
            .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
            .setParameter(CoreProtocolPNames.USER_AGENT, "HttpTest/1.1")
            .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1");
// Create client-side I/O reactor
      ioReactor = new DefaultListeningIOReactor(ioconfig);
    }
public void start() {
      HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor() {
      @Override
      public void connected(
                  final NHttpClientConnection conn,
                  final Object attachment) throws IOException, HttpException {
      try {
ReverseNHttpClient.getConnectionManager().putConnection(conn);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
      System.out.println(conn + ": connection open in ReverseClient side");
      super.connected(conn, attachment);
      }
      };
      // Create client-side I/O event dispatch
      final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, params);
      // Run the I/O reactor in a separate thread
      t = new Thread(new Runnable() {
            public void run() {
                try {
                  // Ready to go!
                  ioReactor.execute(ioEventDispatch);
                } catch (IOException e) {
                  System.err.println("I/O error: " + e.getMessage());
                }
                System.out.println("Shutdown");
            }
      });
      // Start the client thread
      t.start();
}
public void stop() {
if (t != null)
t.interrupt();
}
public void accept(int port) throws InterruptedException {
ListenerEndpoint request = ioReactor.listen(new InetSocketAddress(port));
request.waitFor();
}
public void HttpExchange(HttpHost target, BasicHttpRequest request) throws InterruptedException {
      HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
                // Use standard client-side protocol interceptors
                new RequestContent(),
                new RequestTargetHost(),
                new RequestConnControl(),
                new RequestUserAgent(),
                new RequestExpectContinue()});
      HttpAsyncRequester requester = new HttpAsyncRequester(
                httpproc, new DefaultConnectionReuseStrategy(), params);
      requester.execute(
                new BasicAsyncRequestProducer(target, request),
                new MyResponseConsumer(),
                ReverseNHttpClient.getConnectionManager().takeConnection()
      );
}
static class MyResponseConsumer extends AbstractAsyncResponseConsumer {
    private volatile HttpResponse response;
    private volatile SimpleInputBuffer buf;
    public MyResponseConsumer() {
      super();
    }
    @Override
    protected void onResponseReceived(final HttpResponse response) throws IOException {
      this.response = response;
    }
    @Override
    protected void onEntityEnclosed(
            final HttpEntity entity, final ContentType contentType) throws IOException {
      long len = entity.getContentLength();
      if (len > Integer.MAX_VALUE) {
            throw new ContentTooLongException("Entity content is too long: " + len);
      }
      if (len < 0) {
            len = 4096;
      }
      this.buf = new SimpleInputBuffer((int) len, new HeapByteBufferAllocator());
      this.response.setEntity(new ContentBufferEntity(entity, this.buf));
    }
    @Override
    protected void onContentReceived(
            final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
      if (this.buf == null) {
            throw new IllegalStateException("Content buffer is null");
      }
      this.buf.consumeContent(decoder);
    }
    @Override
    protected void releaseResources() {
      this.response = null;
      this.buf = null;
    }
    @Override
    protected HttpResponse buildResult(final HttpContext context) throws IOException {
      System.out.println();
      System.out.println(((BasicHttpResponse)response).toString());
      response.getEntity().writeTo(System.out);
      System.out.println();
      return this.response;
    }
}
}  其中,start()方法用于启动httpclient,accept()用于启动socket监听,httpExchange用于发送http请求,MyResponseconsumer是一个内部类,用于处理返回的响应。

  
3.2. ReverseNHttpServer
  下面是源码。
  
package com.cangfu.reversehttp;
import java.io.File;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URL;
import java.net.URLDecoder;
import java.security.KeyStore;
import java.util.Locale;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpStatus;
import org.apache.http.MethodNotSupportedException;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultHttpServerIODispatch;
import org.apache.http.impl.nio.DefaultNHttpServerConnection;
import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;
import org.apache.http.impl.nio.SSLNHttpServerConnectionFactory;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.NHttpConnectionFactory;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.nio.entity.NFileEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
import org.apache.http.nio.protocol.HttpAsyncExchange;
import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;
import org.apache.http.nio.protocol.HttpAsyncService;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.nio.reactor.SessionRequestCallback;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
public class ReverseNHttpServer {
    HttpParams params;
    final ConnectingIOReactor ioReactor;
    Thread t;
    static ConnectionManager connMgr = new ConnectionManager();
    public static ConnectionManager getConnectionManager() {
    return connMgr;
    }
    public ReverseNHttpServer(IOReactorConfig ioconfig) throws IOReactorException {
    this.params = new SyncBasicHttpParams();
      this.params
            .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000)
            .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000)
            .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
            .setParameter(CoreProtocolPNames.USER_AGENT, "HttpTest/1.1")
            .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1");
      ioReactor = new DefaultConnectingIOReactor(ioconfig);
    }
    public void start(String path, boolean ssl) throws Exception {
      // Create HTTP protocol processing chain
      HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
                // Use standard server-side protocol interceptors
                new ResponseDate(),
                new ResponseServer(),
                new ResponseContent(),
                new ResponseConnControl()
      });
      // Create request handler registry
      HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry();
      // Register the default handler for all URIs
      registry.register("*", new HttpFileHandler(new File(path)));
      // Create server-side HTTP protocol handler
      HttpAsyncService protocolHandler = new HttpAsyncService(
                httpproc, new DefaultConnectionReuseStrategy(), registry, params) {
            @Override
            public void connected(final NHttpServerConnection conn) {
                System.out.println(conn + ": connection open in ReverseServer side");
                try {
                ReverseNHttpServer.getConnectionManager().putConnection(conn);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
                super.connected(conn);
            }
            @Override
            public void closed(final NHttpServerConnection conn) {
                System.out.println(conn + ": connection closed");
                super.closed(conn);
            }
      };
      // Create HTTP connection factory
      NHttpConnectionFactory connFactory;
      if (ssl) {
            // Initialize SSL context
            ClassLoader cl = ReverseNHttpServer.class.getClassLoader();
            URL url = cl.getResource("my.keystore");
            if (url == null) {
                System.out.println("Keystore not found");
                System.exit(1);
            }
            KeyStore keystore= KeyStore.getInstance("jks");
            keystore.load(url.openStream(), "secret".toCharArray());
            KeyManagerFactory kmfactory = KeyManagerFactory.getInstance(
                  KeyManagerFactory.getDefaultAlgorithm());
            kmfactory.init(keystore, "secret".toCharArray());
            KeyManager[] keymanagers = kmfactory.getKeyManagers();
            SSLContext sslcontext = SSLContext.getInstance("TLS");
            sslcontext.init(keymanagers, null, null);
            connFactory = new SSLNHttpServerConnectionFactory(sslcontext, null, params);
      } else {
            connFactory = new DefaultNHttpServerConnectionFactory(params);
      }
      // Create server-side I/O event dispatch
      final IOEventDispatch ioEventDispatch = new DefaultHttpServerIODispatch(protocolHandler, connFactory);
      // Run the I/O reactor in a separate thread
      t = new Thread(new Runnable() {
            public void run() {
                try {
                  // Ready to go!
                  ioReactor.execute(ioEventDispatch);
                } catch (IOException e) {
                  System.err.println("I/O error: " + e.getMessage());
                }
                System.out.println("Shutdown");
            }
      });
      // Start the client thread
      t.start();
    }
    public void stop() {
    if (t != null) {
    t.interrupt();
    }
    }
    public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress) throws InterruptedException {
    SessionRequest sessionReq = ioReactor.connect(remoteAddress, localAddress, null,
new SessionRequestCallback() {
@Override
public void completed(SessionRequest request) {
System.out.println("connection established succeed in device side!!!");
}
@Override
public void failed(SessionRequest request) {
System.err.println("Device side connection failed!");
System.err.println(request.getException().getMessage());
}
@Override
public void timeout(SessionRequest request) {
System.err.println("Device side connection timeout!");
System.err.println(request.getException().getMessage());
}
@Override
public void cancelled(SessionRequest request) {
System.err.println("Device side connection cancelled!");
System.err.println(request.getException().getMessage());
}
}
    );
    sessionReq.waitFor();
    }
    static class HttpFileHandler implements HttpAsyncRequestHandler {
      private final File docRoot;
      public HttpFileHandler() {
            docRoot = new File(".");
      }
      public HttpFileHandler(final File docRoot) {
            super();
            this.docRoot = docRoot;
      }
      public HttpAsyncRequestConsumer proce***equest(
                final HttpRequest request,
                final HttpContext context) {
            // Buffer request content in memory for simplicity
            return new BasicAsyncRequestConsumer();
      }
      public void handle(
                final HttpRequest request,
                final HttpAsyncExchange httpexchange,
                final HttpContext context) throws HttpException, IOException {
            HttpResponse response = httpexchange.getResponse();
            handleInternal(request, response, context);
            httpexchange.submitResponse(new BasicAsyncResponseProducer(response));
      }
      private void handleInternal(
                final HttpRequest request,
                final HttpResponse response,
                final HttpContext context) throws HttpException, IOException {
            String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
            if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
                throw new MethodNotSupportedException(method + " method not supported");
            }
            String target = request.getRequestLine().getUri();
            final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8"));
            if (!file.exists()) {
                response.setStatusCode(HttpStatus.SC_NOT_FOUND);
                NStringEntity entity = new NStringEntity(
                        "File" + file.getPath() +
                        " not found",
                        ContentType.create("text/html", "UTF-8"));
                response.setEntity(entity);
                System.out.println("File " + file.getPath() + " not found");
            } else if (!file.canRead() || file.isDirectory()) {
                response.setStatusCode(HttpStatus.SC_FORBIDDEN);
                NStringEntity entity = new NStringEntity(
                        "Access denied",
                        ContentType.create("text/html", "UTF-8"));
                response.setEntity(entity);
                System.out.println("Cannot read file " + file.getPath());
            } else {
                NHttpConnection conn = (NHttpConnection) context.getAttribute(
                        ExecutionContext.HTTP_CONNECTION);
                response.setStatusCode(HttpStatus.SC_OK);
                NFileEntity body = new NFileEntity(file, ContentType.create("text/html"));
                response.setEntity(body);
                System.out.println(conn + ": serving file " + file.getPath());
            }
      }
    }
}  其中,start()方法用于启动httpserver,connect()用于发起socket连接。HttpFileHandler()是一个处理http请求的内部钩子类,它从本地读取一个文本文件,通过response发送到服务器。

  

  3.3. ConnectionManager

  源码。
package com.cangfu.reversehttp;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ConnectionManager {
private final BlockingQueue connections;
public ConnectionManager() {
connections = new LinkedBlockingQueue();
}
public void putConnection(E conn) throws InterruptedException {
this.connections.put(conn);
}
public E takeConnection() throws InterruptedException {
return this.connections.take();
}
}  3.4. ReverseHttpTest

  源码如下。
package com.cangfu.reversehttp;
import java.net.InetSocketAddress;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHttpRequest;
import org.junit.Test;
public class ReverseHttpTest {
@Test
public void testReverseHttp() throws Exception {
int sockServerPort = 60010;
int sockClientPort1 = 60011;
int sockClientPort2 = 60012;
IOReactorConfig ioconfig = new IOReactorConfig();
ioconfig.setIoThreadCount(1);
ioconfig.setSoKeepalive(true);
ioconfig.setSoReuseAddress(true);
ioconfig.setSelectInterval(1000);
// Start ReverseClient, at APNS Server side
ReverseNHttpClient reverseClient = new ReverseNHttpClient(ioconfig);
reverseClient.start();
reverseClient.accept(sockServerPort);
// Start ReverseServer, at Device side
ReverseNHttpServer reverseServer1 = new ReverseNHttpServer(ioconfig);
reverseServer1.start(".", false);
reverseServer1.connect(new InetSocketAddress("localhost", sockServerPort), new InetSocketAddress("localhost", sockClientPort1));
// Start another ReverseServer, at Device side
ReverseNHttpServer reverseServer2 = new ReverseNHttpServer(ioconfig);
reverseServer2.start(".", false);
reverseServer2.connect(new InetSocketAddress("localhost", sockServerPort), new InetSocketAddress("localhost", sockClientPort2));
// send a request from APNS server to phone 1
HttpHost target1 = new HttpHost("localhost", sockClientPort1, "http");
BasicHttpRequest request1 = new BasicHttpRequest("GET", "/Hello1.txt");
      reverseClient.HttpExchange(target1, request1);
      // send another request from APNS server to phone 2
      HttpHost target2 = new HttpHost("localhost", sockClientPort2, "http");
      BasicHttpRequest request2 = new BasicHttpRequest("GET", "/Hello2.txt");
      reverseClient.HttpExchange(target2, request2);
      Thread.sleep(2*1000);
      reverseServer1.stop();
      reverseServer2.stop();
      reverseClient.stop();
}
}  

  启动一个推送服务器的HttpClient和两个手机端的HttpServer。推送服务器分别向两个手机发送GET请求。手机读取本地的hello.txt文件,放置于http response的消息体中,返回给服务器。下面是测试运行结果:
  
  




页: [1]
查看完整版本: 巧用apache httpcore