设为首页 收藏本站
查看: 332|回复: 0

[经验分享] Apache Mina: 自定义codec

[复制链接]

尚未签到

发表于 2017-1-4 06:24:09 | 显示全部楼层 |阅读模式
本例子根据mina自带的例子:sumup改写。
1. 基本原理:
1) 客户端向服务端发送AddMessage对象时,先根据AddMessageEncoder编码, 当服务端接收到AddMessage后,根据自定义的AddMessageDecode解码数据。
2) 服务端数据解码后,生成回复对象ResultMessage,并对该对象通过ResultMessageEncoder进行编码,并发送到客户端。 客户端接收ResultMessage后,根据ResultMessageDecoder解码,并将数据显示出来。
3) AddMessageEncoder,ResultMessageEncoder,AddMessageDecoder,ResultMessageDecoder自定义编码和解码的方法。
2. 代码:
1) 创建AddMessage和ResultMessage对象
AbstractMessage.java

public abstract class AbstractMessage implements Serializable {
private int sequence;
public int getSequence() {
return sequence;
}
public void setSequence(int sequence) {
this.sequence = sequence;
}
}

AddMessage.java

public class AddMessage extends AbstractMessage {
private static final long serialVersionUID = -735205238699949292L;
private int value;
public AddMessage(){
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
@Override
public String toString() {
return "AddMessage [value=" + value + ", getSequence()="
+ getSequence() + "]";
}
}

ResultMessage.java

public class ResultMessage extends AbstractMessage {
private static final long serialVersionUID = 7431899532938146290L;
private boolean ok;
private int value;
public ResultMessage(){
}
public boolean isOk() {
return ok;
}
public void setOk(boolean ok) {
this.ok = ok;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
@Override
public String toString() {
return "ResultMessage [ok=" + ok + ", value=" + value
+ ", getSequence()=" + getSequence() + "]";
}
}


2) 创建AddMessageEncoder和ResultMessageEncoder
AddMessageEncoder.java

public class AddMessageEncoder<T extends AddMessage> implements MessageEncoder<T> {
@Override
public void encode(IoSession session, T message, ProtocolEncoderOutput out)
throws Exception {
IoBuffer buf = IoBuffer.allocate(16);
buf.setAutoExpand(true); // Enable auto-expand for easier encoding
// Encode a header
buf.putInt(message.getSequence());
buf.putInt(message.getValue());
buf.flip();
out.write(buf);
}
}

ResultMessageEncoder.java

public class ResultMessageEncoder<T extends ResultMessage> implements MessageEncoder<T> {
@Override
public void encode(IoSession session, T message, ProtocolEncoderOutput out)
throws Exception {
IoBuffer buf = IoBuffer.allocate(16);
buf.setAutoExpand(true); // Enable auto-expand for easier encoding

if(message.isOk()){
buf.putShort((short) Constants.RESULT_OK);
buf.putInt(message.getSequence());
buf.putInt(message.getValue());
}else{
buf.putShort((short)Constants.RESULT_ERROR);
buf.putInt(message.getSequence());
buf.putInt(0);
}
buf.flip();
out.write(buf);
}
}


3) 创建AddMessageDecoder和ResultMessageDecoder
AddMessageDecoder.java

public class AddMessageDecoder implements MessageDecoder {

@Override
public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
if (in.remaining() < 1) {
return MessageDecoderResult.NEED_DATA;
}
// Return NOT_OK if not matches.
return MessageDecoderResult.OK;
}
@Override
public MessageDecoderResult decode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
if (in.remaining() < 1) {
return MessageDecoderResult.NEED_DATA;
}
int sequence = in.getInt();
int value = in.getInt();
AddMessage m = new AddMessage();
m.setSequence(sequence);
m.setValue(value);
out.write(m);
return MessageDecoderResult.OK;

}
@Override
public void finishDecode(IoSession session, ProtocolDecoderOutput out)
throws Exception {
// TODO Auto-generated method stub
}
}

ResultMessageDecoder.java

public class ResultMessageDecoder implements MessageDecoder {
@Override
public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
if (in.remaining() < 1) {
return MessageDecoderResult.NEED_DATA;
}
int code = in.getShort();
if(code==Constants.RESULT_OK){
return MessageDecoderResult.OK;
}
// Return NOT_OK if not matches.
return MessageDecoderResult.NOT_OK;
}
@Override
public MessageDecoderResult decode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
if (in.remaining() < 1) {
return MessageDecoderResult.NEED_DATA;
}
int code = in.getShort();
int sequence = in.getInt();
int value = in.getInt();
ResultMessage m = new ResultMessage();
if(code==Constants.RESULT_OK){
m.setOk(true);
m.setSequence(sequence);
m.setValue(value);
}else{
m.setOk(false);
}
out.write(m);
return MessageDecoderResult.OK;

}
@Override
public void finishDecode(IoSession session, ProtocolDecoderOutput out)
throws Exception {
// TODO Auto-generated method stub
}
}


4) 创建CodecFactory,注册AddMessageEncoder, ResultMessageEncoder, AddMessageDecoder, ResultMessageDecoder.

public class DemoProtocolCodecFactory extends DemuxingProtocolCodecFactory {
public DemoProtocolCodecFactory(boolean isServer){
if(isServer){
super.addMessageEncoder(ResultMessage.class, ResultMessageEncoder.class);
super.addMessageDecoder(AddMessageDecoder.class);
}else{
super.addMessageEncoder(AddMessage.class, AddMessageEncoder.class);
super.addMessageDecoder(ResultMessageDecoder.class);
}
}
}


5) 创建server端服务和业务处理ServerIoHandler
Server.java

public class Server {
public void init() throws IOException{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new DemoProtocolCodecFactory(Constants.IS_SERVER)));
acceptor.setHandler(new ServerIoHandler());
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
acceptor.setDefaultLocalAddress(new InetSocketAddress(Constants.PORT));
acceptor.bind();// 启动监听  
}

public static void main(String[] args) throws IOException {
Server server = new Server();
server.init();
}
}

ServerIoHandler.java

public class ServerIoHandler extends IoHandlerAdapter {
private static final String SUM_KEY = "sum";
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
System.out.println(cause.getMessage());
cause.printStackTrace();
session.close(true);
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
AddMessage add = (AddMessage) message;
int sum = ((Integer)session.getAttribute(SUM_KEY)).intValue();
int value = add.getValue();
long total = (long) sum + value;
if(total>Integer.MAX_VALUE || total<Integer.MIN_VALUE){
ResultMessage result = new ResultMessage();
result.setSequence(add.getSequence());
result.setOk(false);
session.write(result);
}else{
sum = (int) total;
session.setAttribute(SUM_KEY, sum);
ResultMessage result = new ResultMessage();
result.setSequence(add.getSequence());
result.setOk(true);
result.setValue(sum);
session.write(result);
}
//System.out.println("total=" + total);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
session.close(true);
}
@Override
public void sessionOpened(IoSession session) throws Exception {
session.setAttribute(SUM_KEY, new Integer(0));
}
}


6) 创建Client端连接和业务处理ClientIoHandler
Client.java

public class Client {
public void init() throws InterruptedException{
NioSocketConnector connector = new NioSocketConnector();
// Configure the service.
connector.setConnectTimeoutMillis(Constants.CONNECT_TIMEOUT);
connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new DemoProtocolCodecFactory(!Constants.IS_SERVER)));
connector.getFilterChain().addLast("logger", new LoggingFilter());
connector.setHandler(new ClientIoHandler());
IoSession session;
for (;;) {
try {
ConnectFuture future = connector.connect(new InetSocketAddress(Constants.HOSTNAME, Constants.PORT));
future.awaitUninterruptibly();
session = future.getSession();
break;
} catch (RuntimeIoException e) {
System.err.println("Failed to connect.");
e.printStackTrace();
Thread.sleep(5000);
}
}
// wait until the summation is done
session.getCloseFuture().awaitUninterruptibly();
connector.dispose();
}
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
Client client = new Client();
client.init();
}
}


ClientIoHandler.java

public class ClientIoHandler extends IoHandlerAdapter {
private List<Integer> values = new ArrayList<Integer>();
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
session.close(true);
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
ResultMessage rm = (ResultMessage) message;
if (rm.isOk()) {
if (rm.getSequence() == values.size() - 1) {
System.out.println("the sum is " + rm.getValue());
session.close(true);
//finished = true;
}
} else {
System.out.println("Server error, disconnecting...");
session.close(true);
//finished = true;
}
}
@Override
public void sessionOpened(IoSession session) throws Exception {
init();
for (int i = 0; i < values.size(); i++) {
int _value = ((Integer) values.get(i)).intValue();
AddMessage m = new AddMessage();
m.setSequence(i);
m.setValue(_value);
session.write(m);
}
}

private void init(){
for(int i=0;i<3;i++){
int _value = i*100 + 1;
values.add(new Integer(_value));
}
}
}

7. Constants.java

public class Constants {
public final static int PORT = 9123;
public static final String HOSTNAME = "localhost";
public static final long CONNECT_TIMEOUT = 30*1000L; // 30 seconds
public final static boolean IS_SERVER = true;
public static final int RESULT_OK = 0;
public static final int RESULT_ERROR = 1;
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-323378-1-1.html 上篇帖子: apache VelocityEngine使用记录 下篇帖子: 安装Apache Ant —— 系统需求
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表