设为首页 收藏本站
查看: 666|回复: 0

[经验分享] (二)androidpn-server tomcat版源码解析之--push消息处理

[复制链接]

尚未签到

发表于 2017-2-10 08:57:04 | 显示全部楼层 |阅读模式
  在 (一)androidpn-server tomcat版源码解析之--项目启动这篇中,已经描述了整个推送服务器的启动过程,并且把握到了消息的入口即XmppIoHandler这个类,今天我将继续往下分析下面的核心代码,主要分为3大块,链接创建,消息的发送,链接关闭。
  先贴一段XmppIoHandler的部分代码

/**
* Invoked from an I/O processor thread when a new connection has been created.
*/
public void sessionCreated(IoSession session) throws Exception {
log.debug("sessionCreated()...");
session.getConfig().setBothIdleTime(IDLE_TIME);;
}
/**
* Invoked when a connection has been opened.
*/
public void sessionOpened(IoSession session) throws Exception {
log.debug("sessionOpened()...");
log.debug("remoteAddress=" + session.getRemoteAddress());
// Create a new XML parser
XMLLightweightParser parser = new XMLLightweightParser("UTF-8");
session.setAttribute(XML_PARSER, parser);
// Create a new connection
Connection connection = new Connection(session);
session.setAttribute(CONNECTION, connection);
session.setAttribute(STANZA_HANDLER, new StanzaHandler(serverName,
connection));
}
/**
* Invoked when a connection is closed.
*/
public void sessionClosed(IoSession session) throws Exception {
log.debug("sessionClosed()...");
Connection connection = (Connection) session.getAttribute(CONNECTION);
connection.close();
}
/**
* Invoked with the related IdleStatus when a connection becomes idle.
*/
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
log.debug("sessionIdle()...");
Connection connection = (Connection) session.getAttribute(CONNECTION);
if (log.isDebugEnabled()) {
log.debug("Closing connection that has been idle: " + connection);
}
connection.close();
}
/**
* Invoked when any exception is thrown.
*/
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
log.debug("exceptionCaught()...");
log.error(cause);
}
/**
* Invoked when a message is received.
*/
public void messageReceived(IoSession session, Object message)
throws Exception {
log.debug("messageReceived()...");
log.debug("RCVD: " + message);
// Get the stanza handler
StanzaHandler handler = (StanzaHandler) session
.getAttribute(STANZA_HANDLER);
// Get the XMPP packet parser
int hashCode = Thread.currentThread().hashCode();
XMPPPacketReader parser = parsers.get(hashCode);
if (parser == null) {
parser = new XMPPPacketReader();
parser.setXPPFactory(factory);
parsers.put(hashCode, parser);
}
// The stanza handler processes the message
try {
handler.process((String) message, parser);
} catch (Exception e) {
log.error(
"Closing connection due to error while processing message: "
+ message, e);
Connection connection = (Connection) session
.getAttribute(CONNECTION);
connection.close();
}
}
/**
* Invoked when a message written by IoSession.write(Object) is sent out.
*/
public void messageSent(IoSession session, Object message) throws Exception {
log.debug("messageSent()...");
}


  •  链接创建
  androidpn-server的消息处理步奏为: sessionCreated->sessionOpened->Router->handle
  对应的处理类为:XmppIoHandler- >StanzaHandler->PacketRouterIQAuthHandler.java->IQHandler.java
                                                                                                                            IQRegisterHandler.java
                                                                                                                            IQRosterHandler.java
  PresenceUpdateHandler.java
  androidpn是以xmpp为协议的,要看懂以下的处理步奏也就必须补充点xmpp的一个相关知识
  xmpp的三种消息类型:

message:message是一种基本推送消息方法,它不要求响应。

presence:presence用来表明用户的状态,如:online、away、dnd(请勿打扰)等

iq:一种请求/响应机制,从一个实体从发送请求,另外一个实体接受请求,并进行响应。
  XmppIoHandler.java->  sessionCreated:

public void sessionCreated(IoSession session) throws Exception {
log.debug("sessionCreated()...");
session.getConfig().setBothIdleTime(IDLE_TIME);;
}
  client的请求第一次到达时候为session设置了一个idletime
  XmppIoHandler.java-> sessionOpened

public void sessionOpened(IoSession session) throws Exception {
log.debug("sessionOpened()...");
log.debug("remoteAddress=" + session.getRemoteAddress());
// Create a new XML parser
XMLLightweightParser parser = new XMLLightweightParser("UTF-8");
session.setAttribute(XML_PARSER, parser);
// Create a new connection
Connection connection = new Connection(session);
session.setAttribute(CONNECTION, connection);
session.setAttribute(STANZA_HANDLER, new StanzaHandler(serverName,
connection));
}
  初始化了xml解析器,并且创建了StanzaHandler实例,StanzaHandler这个类时后续处理的关键
  StanzaHandler.java-> StanzaHandler

    /**
* Constructor.
*
* @param serverName the server name
* @param connection the connection
*/
public StanzaHandler(String serverName, Connection connection) {
this.serverName = serverName;
this.connection = connection;
this.router = new PacketRouter();//该构造中初始化了三个router实现,分别是 MessageRouter,PresenceRouter,IQRouter,分别处理xmpp的三种消息类型
notificationService = ServiceLocator.getNotificationService();
notificationManager = new NotificationManager();
}
  XmppIoHandler.java-> messageReceived

    public void messageReceived(IoSession session, Object message)
throws Exception {
log.debug("messageReceived()...");
log.debug("RCVD: " + message);
// Get the stanza handler
StanzaHandler handler = (StanzaHandler) session
.getAttribute(STANZA_HANDLER);
// Get the XMPP packet parser
int hashCode = Thread.currentThread().hashCode();
XMPPPacketReader parser = parsers.get(hashCode);
if (parser == null) {
parser = new XMPPPacketReader();
parser.setXPPFactory(factory);
parsers.put(hashCode, parser);
}
// The stanza handler processes the message
try {
handler.process((String) message, parser);//###这个方法中,程序会根据xmpp的xml头来判断消息类型,并且传递到对应的Router处理类
} catch (Exception e) {
log.error(
"Closing connection due to error while processing message: "
+ message, e);
Connection connection = (Connection) session
.getAttribute(CONNECTION);
connection.close();
}
}
  因为当前做的需求是消息推送,而MessageRouter实用在IM这种对话场合,所以MessageRouter并没有实现处理PresenceRouter和IQRouter都有对应的功能实现。
  IQRoute.java -> IQRouter

public IQRouter() {
sessionManager = SessionManager.getInstance();
iqHandlers.add(new IQAuthHandler());
iqHandlers.add(new IQRegisterHandler());
iqHandlers.add(new IQRosterHandler());
notificationService = ServiceLocator.getNotificationService();
}
  构造中又实现了3和handler,用户建立好链接后又会对用户进行IQAuthHandler(鉴权),IQRegisterHandler(解析用户携带参数,往数据库中插入用户信息,将链接保存在sessionManager中),IQRosterHandler(没进行什么操作)



  • 消息的发送
  以广播消息为例
  SessionManager.java

/**
* Returns a list that contains all authenticated client sessions.
*
* @return a list that contains all client sessions
*/
public Collection<ClientSession> getSessions() {
return clientSessions.values();
}
  获得链接池中所有链接
  Session.java

    /**
* Delivers the packet to the associated connection.
*
* @param packet the packet to deliver
*/
public void deliver(Packet packet) {
if (connection != null && !connection.isClosed()) {
connection.deliver(packet);
}
}
  Connection.java

/**
* Delivers the packet to this connection (without checking the recipient).
*
* @param packet the packet to deliver
*/
public void deliver(Packet packet) {
log.debug("SENT: " + packet.toXML());
if (!isClosed()) {
IoBuffer buffer = IoBuffer.allocate(4096);
buffer.setAutoExpand(true);
boolean errorDelivering = false;
try {
XMLWriter xmlSerializer = new XMLWriter(new IoBufferWriter(
buffer, (CharsetEncoder) encoder.get()),
new OutputFormat());
xmlSerializer.write(packet.getElement());
xmlSerializer.flush();
buffer.flip();
ioSession.write(buffer);//###通过mina的isSession对象传递
} catch (Exception e) {
log.debug("Connection: Error delivering packet" + "\n"
+ this.toString(), e);
errorDelivering = true;
}
if (errorDelivering) {
close();
} else {
session.incrementServerPacketCount();
}
}
}
  封装层次为isSession->Connection->Session



  • 链接关闭
  XmppIoHandler.java

/**
* Invoked when a connection is closed.
*/
public void sessionClosed(IoSession session) throws Exception {
log.debug("sessionClosed()...");
Connection connection = (Connection) session.getAttribute(CONNECTION);
connection.close();
}

  Connection.java

/**
* Closes the session including associated socket connection,
* notifing all listeners that the channel is shutting down.
*/
public void close() {
boolean closedSuccessfully = false;
synchronized (this) {
if (!isClosed()) {
try {
deliverRawText("</stream:stream>", false);
} catch (Exception e) {
// Ignore
}
if (session != null) {
session.setStatus(Session.STATUS_CLOSED);
}
ioSession.close(false);
closed = true;
closedSuccessfully = true;
}
}
if (closedSuccessfully) {
notifyCloseListeners();
}
}
  此处使用了观察者模式,首先传递了Xmpp的结束xml“</stream:stream>”,再关闭了mina最低层的ioSession,再通知观察者,也就是androidpn封装的ClientSession在内存中删除该会话。

 原创文章,转载请声名出处  http://spjich.iteye.com/blog/2226149

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-339950-1-1.html 上篇帖子: 如何在一台服务器上运行同一个端口的多个Tomcat服务,实现两个一台机器,两个IP地址共用80端口的方法 下篇帖子: web应用下的静态页面放在tomcat的webapps下和本应用一起加载
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表