设为首页 收藏本站
查看: 951|回复: 0

[经验分享] Zookeeper实现简单的分布式RPC框架

[复制链接]

尚未签到

发表于 2017-4-19 12:42:13 | 显示全部楼层 |阅读模式
  RPC(Remote Procedure Call) 在介绍分布是RPC前首先介绍一个下JAVA中简单的RPC实现
  服务器端,通过SocketServer,持续接收客户端的请求,并将客户端的请求分发到指定的处理器出去处理。

/**
*
* @author zhangwei_david
* @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $
*/
public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {
/**服务端口号**/
private int                port       = 12000;
private ServerSocket       server;
//线程池
@Autowired
private Executor           executorService;
public Map<String, Object> handlerMap = new ConcurrentHashMap<>();
private void publishedService() throws Exception {
server = new ServerSocket(port);
// 一直服务
for (;;) {
try {
// 获取socket
final Socket socket = server.accept();
executorService.execute(new Runnable() {
@Override
public void run() {
try {
// 获取input
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream output = new ObjectOutputStream(socket
.getOutputStream());
try {
// 获取引用
String interfaceName = input.readUTF();
//获取 方法名
String methodName = input.readUTF();
//
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
try {
Object service = handlerMap.get(interfaceName);
Method method = service.getClass().getMethod(methodName,
parameterTypes);
Object result = method.invoke(service, arguments);
output.writeObject(result);
} catch (Throwable t) {
output.writeObject(t);
} finally {
input.close();
}
} finally {
socket.close();
}
} catch (Exception e) {
}
}
});
} catch (Exception e) {
}
}
}
public void init() {
}
/**
* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
*/
@Override
public void afterPropertiesSet() throws Exception {
//发布服务
publishedService();
}
/**
* @see org.springframework.context.Lifecycle#start()
*/
@Override
public void start() {
}
/**
* @see org.springframework.context.Lifecycle#stop()
*/
@Override
public void stop() {
if (server != null) {
try {
server.close();
} catch (IOException e) {
}
}
}
/**
* @see org.springframework.context.Lifecycle#isRunning()
*/
@Override
public boolean isRunning() {
return false;
}
/**
* @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);
System.out.println(serviceBeanMap);
if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
for (Object serviceBean : serviceBeanMap.values()) {
String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf()
.getName();
handlerMap.put(interfaceName, serviceBean);
}
}
}
/**
* Setter method for property <tt>executorService</tt>.
*
* @param executorService value to be assigned to property executorService
*/
public void setExecutorService(Executor executorService) {
this.executorService = executorService;
}
}


/**
*
* @author zhangwei_david
* @version $Id: SRPC.java, v 0.1 2015年8月8日 下午12:51:17 zhangwei_david Exp $
*/
@Documented
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface SRPC {
public Class<?> interf();
}

  至此就实现了服务的自动发现自动注册,当然这个仅针对单机环境下的自动注册。

/**
*
* @author zhangwei_david
* @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $
*/
public class Client {
/**
* 引用服务
*
* @param <T> 接口泛型
* @param interfaceClass 接口类型
* @param host 服务器主机名
* @param port 服务器端口
* @return 远程服务
* @throws Exception
*/
@SuppressWarnings("unchecked")
public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
throws Exception {
if (interfaceClass == null || !interfaceClass.isInterface()) {
throw new IllegalArgumentException("必须指定服务接口");
}
if (host == null || host.length() == 0) {
throw new IllegalArgumentException("必须指定服务器的地址和端口号");
}
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[] { interfaceClass }, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] arguments)
throws Throwable {
Socket socket = new Socket(host, port);
try {
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
output.writeUTF(interfaceClass.getName());
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(arguments);
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
} finally {
input.close();
}
} finally {
output.close();
}
} finally {
socket.close();
}
}
});
}
}
  上面在没有使用第三方依赖包实现了简单的RPC,优化增加 request和reponse,定义RPC协议。

/**
*
* @author zhangwei_david
* @version $Id: SrpcRequest.java, v 0.1 2015年8月8日 下午1:45:53 zhangwei_david Exp $
*/
public class SrpcRequest implements Serializable {
/**  */
private static final long serialVersionUID = 6132853628325824727L;
// 请求Id
private String            requestId;
// 远程调用接口名称
private String            interfaceName;
//远程调用方法名称
private String            methodName;
// 参数类型
private Class<?>[]        parameterTypes;
// 参数值
private Object[]          parameters;
/**
* Getter method for property <tt>requestId</tt>.
*
* @return property value of requestId
*/
public String getRequestId() {
return requestId;
}
/**
* Setter method for property <tt>requestId</tt>.
*
* @param requestId value to be assigned to property requestId
*/
public void setRequestId(String requestId) {
this.requestId = requestId;
}
/**
* Getter method for property <tt>interfaceName</tt>.
*
* @return property value of interfaceName
*/
public String getInterfaceName() {
return interfaceName;
}
/**
* Setter method for property <tt>interfaceName</tt>.
*
* @param interfaceName value to be assigned to property interfaceName
*/
public void setInterfaceName(String interfaceName) {
this.interfaceName = interfaceName;
}
/**
* Getter method for property <tt>methodName</tt>.
*
* @return property value of methodName
*/
public String getMethodName() {
return methodName;
}
/**
* Setter method for property <tt>methodName</tt>.
*
* @param methodName value to be assigned to property methodName
*/
public void setMethodName(String methodName) {
this.methodName = methodName;
}
/**
* Getter method for property <tt>parameterTypes</tt>.
*
* @return property value of parameterTypes
*/
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
/**
* Setter method for property <tt>parameterTypes</tt>.
*
* @param parameterTypes value to be assigned to property parameterTypes
*/
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
/**
* Getter method for property <tt>parameters</tt>.
*
* @return property value of parameters
*/
public Object[] getParameters() {
return parameters;
}
/**
* Setter method for property <tt>parameters</tt>.
*
* @param parameters value to be assigned to property parameters
*/
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
}

/**
*
* @author zhangwei_david
* @version $Id: SrpcResponse.java, v 0.1 2015年8月8日 下午1:47:46 zhangwei_david Exp $
*/
public class SrpcResponse implements Serializable {
/**  */
private static final long serialVersionUID = -5934073769679010930L;
// 请求的Id
private String            requestId;
// 异常
private Throwable         error;
// 响应
private Object            result;
/**
* Getter method for property <tt>requestId</tt>.
*
* @return property value of requestId
*/
public String getRequestId() {
return requestId;
}
/**
* Setter method for property <tt>requestId</tt>.
*
* @param requestId value to be assigned to property requestId
*/
public void setRequestId(String requestId) {
this.requestId = requestId;
}
/**
* Getter method for property <tt>error</tt>.
*
* @return property value of error
*/
public Throwable getError() {
return error;
}
/**
* Setter method for property <tt>error</tt>.
*
* @param error value to be assigned to property error
*/
public void setError(Throwable error) {
this.error = error;
}
/**
* Getter method for property <tt>result</tt>.
*
* @return property value of result
*/
public Object getResult() {
return result;
}
/**
* Setter method for property <tt>result</tt>.
*
* @param result value to be assigned to property result
*/
public void setResult(Object result) {
this.result = result;
}
}


/**
*
* @author zhangwei_david
* @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $
*/
public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {
/**服务端口号**/
private int                port       = 12000;
private ServerSocket       server;
//线程池
@Autowired
private Executor           executorService;
public Map<String, Object> handlerMap = new ConcurrentHashMap<>();
private void publishedService() throws Exception {
server = new ServerSocket(port);
// 一直服务
for (;;) {
try {
// 获取socket
final Socket socket = server.accept();
executorService.execute(new Runnable() {
@Override
public void run() {
try {
// 获取input
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
// 获取RPC请求
SrpcRequest request = (SrpcRequest) input.readObject();
ObjectOutputStream output = new ObjectOutputStream(socket
.getOutputStream());
try {
SrpcResponse response = doHandle(request);
output.writeObject(response);
} finally {
input.close();
}
} finally {
socket.close();
}
} catch (Exception e) {
}
}
});
} catch (Exception e) {
}
}
}
private SrpcResponse doHandle(SrpcRequest request) {
SrpcResponse response = new SrpcResponse();
response.setRequestId(request.getRequestId());
try {
Object service = handlerMap.get(request.getInterfaceName());
Method method = service.getClass().getMethod(request.getMethodName(),
request.getParameterTypes());
response.setResult(method.invoke(service, request.getParameters()));
} catch (Exception e) {
response.setError(e);
}
return response;
}
public void init() {
}
/**
* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
*/
@Override
public void afterPropertiesSet() throws Exception {
//发布
publishedService();
}
/**
* @see org.springframework.context.Lifecycle#start()
*/
@Override
public void start() {
}
/**
* @see org.springframework.context.Lifecycle#stop()
*/
@Override
public void stop() {
if (server != null) {
try {
server.close();
} catch (IOException e) {
}
}
}
/**
* @see org.springframework.context.Lifecycle#isRunning()
*/
@Override
public boolean isRunning() {
return false;
}
/**
* @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);
System.out.println(serviceBeanMap);
if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
for (Object serviceBean : serviceBeanMap.values()) {
String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf()
.getName();
handlerMap.put(interfaceName, serviceBean);
}
}
}
/**
* Setter method for property <tt>executorService</tt>.
*
* @param executorService value to be assigned to property executorService
*/
public void setExecutorService(Executor executorService) {
this.executorService = executorService;
}
}


/**
*
* @author zhangwei_david
* @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $
*/
public class Client {
/**
* 引用服务
*
* @param <T> 接口泛型
* @param interfaceClass 接口类型
* @param host 服务器主机名
* @param port 服务器端口
* @return 远程服务
* @throws Exception
*/
@SuppressWarnings("unchecked")
public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
throws Exception {
if (interfaceClass == null || !interfaceClass.isInterface()) {
throw new IllegalArgumentException("必须指定服务接口");
}
if (host == null || host.length() == 0) {
throw new IllegalArgumentException("必须指定服务器的地址和端口号");
}
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[] { interfaceClass }, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] arguments)
throws Throwable {
Socket socket = new Socket(host, port);
try {
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
SrpcRequest request = new SrpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setInterfaceName(interfaceClass.getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(arguments);
output.writeObject(request);
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
SrpcResponse response = (SrpcResponse) input.readObject();
if (response.getError() != null
&& response.getError() instanceof Throwable) {
throw response.getError();
}
return response.getResult();
} finally {
input.close();
}
} finally {
output.close();
}
} finally {
socket.close();
}
}
});
}
}

  后续继续优化序列化和NIO优化

/**
*
* @author zhangwei_david
* @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $
*/
public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {
/**服务端口号**/
private int                 port       = 12000;
private Selector            selector;
private ServerSocketChannel serverSocketChannel;
public Map<String, Object>  handlerMap = new ConcurrentHashMap<>();
private void publishedService() throws Exception {
// 一直服务
for (;;) {
try {
//超时1s
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
SrpcRequest request = SerializationUtil.deserializer(bytes, SrpcRequest.class);
SrpcResponse response = doHandle(request);
doWriteResponse(sc, response);
} else if (readBytes < 0) {
key.cancel();
sc.close();
}
}
}
}
private void doWriteResponse(SocketChannel channel, SrpcResponse response) throws IOException {
if (response == null) {
return;
}
byte[] bytes = SerializationUtil.serializer(response);
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
/**
*
* @throws IOException
* @throws ClosedChannelException
*/
private void init() throws IOException, ClosedChannelException {
// 打开socketChannel
serverSocketChannel = ServerSocketChannel.open();
//设置非阻塞模式
serverSocketChannel.configureBlocking(false);
// 绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 创建selector
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
private SrpcResponse doHandle(SrpcRequest request) {
SrpcResponse response = new SrpcResponse();
if (StringUtils.isBlank(request.getRequestId())) {
response.setError(new IllegalArgumentException("request id must be not null"));
}
response.setRequestId(request.getRequestId());
try {
Object service = handlerMap.get(request.getInterfaceName());
Method method = service.getClass().getMethod(request.getMethodName(),
request.getParameterTypes());
response.setResult(method.invoke(service, request.getParameters()));
} catch (Exception e) {
response.setError(e);
}
return response;
}
/**
* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
*/
@Override
public void afterPropertiesSet() throws Exception {
init();
//发布
publishedService();
}
/**
* @see org.springframework.context.Lifecycle#start()
*/
@Override
public void start() {
}
/**
* @see org.springframework.context.Lifecycle#stop()
*/
@Override
public void stop() {
}
/**
* @see org.springframework.context.Lifecycle#isRunning()
*/
@Override
public boolean isRunning() {
return false;
}
/**
* @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);
System.out.println(serviceBeanMap);
if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
for (Object serviceBean : serviceBeanMap.values()) {
String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf()
.getName();
handlerMap.put(interfaceName, serviceBean);
}
}
}
}


/**
*
* @author zhangwei_david
* @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $
*/
public class Client {
/**
* 引用服务
*
* @param <T> 接口泛型
* @param interfaceClass 接口类型
* @param host 服务器主机名
* @param port 服务器端口
* @return 远程服务
* @throws Exception
*/
@SuppressWarnings("unchecked")
public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
throws Exception {
if (interfaceClass == null || !interfaceClass.isInterface()) {
throw new IllegalArgumentException("必须指定服务接口");
}
if (host == null || host.length() == 0) {
throw new IllegalArgumentException("必须指定服务器的地址和端口号");
}
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[] { interfaceClass }, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] arguments)
throws Throwable {
//创建请求
SrpcRequest request = new SrpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setInterfaceName(interfaceClass.getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(arguments);
SrpcResponse response = sendReqeust(request, host, port);
if (response == null
|| !StringUtils.equals(request.getRequestId(), response.getRequestId())) {
return null;
}
if (response.getError() != null) {
throw response.getError();
}
return response.getResult();
}
});
}
public static SrpcResponse sendReqeust(SrpcRequest request, String host, int port)
throws IOException {
SocketChannel socketChannel = connect(host, port);
byte[] requestBytes = SerializationUtil.serializer(request);
ByteBuffer writeBuffer = ByteBuffer.allocate(requestBytes.length);
writeBuffer.put(requestBytes);
writeBuffer.flip();
socketChannel.write(writeBuffer);
return readResoponse(socketChannel);
}
/**
*
* @return
* @throws IOException
*/
private static SrpcResponse readResoponse(SocketChannel socketChannel) throws IOException {
try {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
while (socketChannel.read(readBuffer) != -1) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
return SerializationUtil.deserializer(bytes, SrpcResponse.class);
}
return null;
} finally {
socketChannel.close();
}
}
private static SocketChannel connect(String host, int port) throws IOException {//连接到CSDN
InetSocketAddress socketAddress = new InetSocketAddress(host, port);
return SocketChannel.open(socketAddress);
}
}

  在分布式系统中,为了提供系统的可用性和稳定性一般都会将服务部署在多台服务器上,为了实现自动注册自动发现远程服务,通过ZK,和ProtocolBuffe 以及Netty实现一个简单的分布式RPC框架。
  首先简单介绍一下Zookeeper和ProtocalBuffer
  Zookeeper 是由Apache Handoop的子项目发展而来。是知名的互联网公司Yahoo创建的。Zookeeper为分布式应用提供了高效且可靠的分布式协调服务。
  ProtocolBuffer是用于结构化数据串行化的灵活、高效、自动的方法,有如XML,不过它更小、更快、也更简单。你可以定义自己的数据结构,然后使用代码生成器生成的代码来读写这个数据结构。你甚至可以在无需重新部署程序的情况下更新数据结构。
  RPC 就是Remote Procedure Call Protocol 远程过程调用协议。
  JAVA对象要能够在网络上传输都必须序列化,使用高效的序列化框架ProtocolBuffer实现序列化。

/**
* 序列化工具
* @author zhangwei_david
* @version $Id: SerializationUtil.java, v 0.1 2014年12月31日 下午5:41:35 zhangwei_david Exp $
*/
public class SerializationUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
private static Objenesis                objenesis    = new ObjenesisStd(true);
private static <T> Schema<T> getSchema(Class<T> clazz) {
@SuppressWarnings("unchecked")
Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
if (schema == null) {
schema = RuntimeSchema.getSchema(clazz);
if (schema != null) {
cachedSchema.put(clazz, schema);
}
}
return schema;
}
/**
* 序列化
*
* @param obj
* @return
*/
public static <T> byte[] serializer(T obj) {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(clazz);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
/**
* 反序列化
*
* @param data
* @param clazz
* @return
*/
public static <T> T deserializer(byte[] data, Class<T> clazz) {
try {
T obj = objenesis.newInstance(clazz);
Schema<T> schema = getSchema(clazz);
ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}

  远程调用的请求对象

/**
*Rpc 请求的主体
* @author zhangwei_david
* @version $Id: SrRequest.java, v 0.1 2014年12月31日 下午6:06:25 zhangwei_david Exp $
*/
public class RpcRequest {
// 请求Id
private String     requestId;
// 远程调用类名称
private String     className;
//远程调用方法名称
private String     methodName;
// 参数类型
private Class<?>[] parameterTypes;
// 参数值
private Object[]   parameters;
/**
* Getter method for property <tt>requestId</tt>.
*
* @return property value of requestId
*/
public String getRequestId() {
return requestId;
}
/**
* Setter method for property <tt>requestId</tt>.
*
* @param requestId value to be assigned to property requestId
*/
public void setRequestId(String requestId) {
this.requestId = requestId;
}
/**
* Getter method for property <tt>className</tt>.
*
* @return property value of className
*/
public String getClassName() {
return className;
}
/**
* Setter method for property <tt>className</tt>.
*
* @param className value to be assigned to property className
*/
public void setClassName(String className) {
this.className = className;
}
/**
* Getter method for property <tt>methodName</tt>.
*
* @return property value of methodName
*/
public String getMethodName() {
return methodName;
}
/**
* Setter method for property <tt>methodName</tt>.
*
* @param methodName value to be assigned to property methodName
*/
public void setMethodName(String methodName) {
this.methodName = methodName;
}
/**
* Getter method for property <tt>parameterTypes</tt>.
*
* @return property value of parameterTypes
*/
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
/**
* Setter method for property <tt>parameterTypes</tt>.
*
* @param parameterTypes value to be assigned to property parameterTypes
*/
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
/**
* Getter method for property <tt>parameters</tt>.
*
* @return property value of parameters
*/
public Object[] getParameters() {
return parameters;
}
/**
* Setter method for property <tt>parameters</tt>.
*
* @param parameters value to be assigned to property parameters
*/
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
/**
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "RpcRequest [requestId=" + requestId + ", className=" + className + ", methodName="
+ methodName + ", parameterTypes=" + Arrays.toString(parameterTypes)
+ ", parameters=" + Arrays.toString(parameters) + "]";
}
}

  远程调用的响应对象

/**
*Rpc 响应的主体
* @author zhangwei_david
* @version $Id: SrResponse.java, v 0.1 2014年12月31日 下午6:07:27 zhangwei_david Exp $
*/
public class RpcResponse {
// 请求的Id
private String    requestId;
// 异常
private Throwable error;
// 响应
private Object    result;
/**
* Getter method for property <tt>requestId</tt>.
*
* @return property value of requestId
*/
public String getRequestId() {
return requestId;
}
/**
* Setter method for property <tt>requestId</tt>.
*
* @param requestId value to be assigned to property requestId
*/
public void setRequestId(String requestId) {
this.requestId = requestId;
}
/**
* Getter method for property <tt>error</tt>.
*
* @return property value of error
*/
public Throwable getError() {
return error;
}
/**
* Setter method for property <tt>error</tt>.
*
* @param error value to be assigned to property error
*/
public void setError(Throwable error) {
this.error = error;
}
/**
* Getter method for property <tt>result</tt>.
*
* @return property value of result
*/
public Object getResult() {
return result;
}
/**
* Setter method for property <tt>result</tt>.
*
* @param result value to be assigned to property result
*/
public void setResult(Object result) {
this.result = result;
}
/**
*如果有异常则表示失败
* @return
*/
public boolean isError() {
return error != null;
}
/**
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "RpcResponse [requestId=" + requestId + ", error=" + error + ", result=" + result
+ "]";
}
}

  RPC编码与解码

/**
*RPC 解码
* @author zhangwei_david
* @version $Id: RpcDecoder.java, v 0.1 2014年12月31日 下午8:53:16 zhangwei_david Exp $
*/
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> genericClass;
public RpcDecoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (dataLength < 0) {
ctx.close();
}
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = SerializationUtil.deserializer(data, genericClass);
out.add(obj);
}
}

/**
*
* @author zhangwei_david
* @version $Id: RpcEncoder.java, v 0.1 2014年12月31日 下午8:55:25 zhangwei_david Exp $
*/
@SuppressWarnings("rawtypes")
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> genericClass;
public RpcEncoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
if (genericClass.isInstance(in)) {
byte[] data = SerializationUtil.serializer(in);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}

  RPC的请求处理器

/**
*RPC请求处理器
* @author zhangwei_david
* @version $Id: RpcHandler.java, v 0.1 2014年12月31日 下午9:04:52 zhangwei_david Exp $
*/
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {
private static final Logger       logger = LogManager.getLogger(RpcHandler.class);
private final Map<String, Object> handlerMap;
public RpcHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
RpcResponse response = new RpcResponse();
// 将请求的Id写入Response
response.setRequestId(request.getRequestId());
try {
LogUtils.info(logger, "处理请求:{0}", request);
Object result = handle(request);
response.setResult(result);
} catch (Throwable t) {
response.setError(t);
}
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/**
* 请求的处理主体
*
* @param request
* @return
* @throws Throwable
*/
private Object handle(RpcRequest request) throws Throwable {
String className = request.getClassName();
Object serviceBean = handlerMap.get(className);
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
return serviceFastMethod.invoke(serviceBean, parameters);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
  为了方便实现服务的注册,定义一个注解

/**
* 简单的RPC协议的方法的注解
* @author zhangwei_david
* @version $Id: STRService.java, v 0.1 2014年12月31日 下午4:33:14 zhangwei_david Exp $
*/
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
String value() default "";
Class<?> inf();
}

  将远程服务注册到ZK

/**
* 简单RPC服务注册
* <ul>
* 注册方法是register(),该方法的主要功能如下:
* <li> 对目标服务器创建一个ZooKeeper实例</li>
* <li> 如果可以成功创建ZooKeeper实例,则创建一个节点</li>
* </ul>
* @author zhangwei_david
* @version $Id: ServiceRegistry.java, v 0.1 2014年12月31日 下午6:08:47 zhangwei_david Exp $
*/
public class ServiceRegistry {
// 日期记录器
private static final Logger logger       = LogManager.getLogger(ServiceRegistry.class);
// 使用计数器实现同步
private CountDownLatch      latch        = new CountDownLatch(1);
private int                 timeout      = Constant.DEFAULT_ZK_SESSION_TIMEOUT;
private String              registerPath = Constant.DEFAULT_ZK_REGISTRY_PATH;
private String              registerAddress;
public void register(String data) {
LogUtils.debug(logger, "注册服务{0}", data);
if (data != null) {
ZooKeeper zk = connectServer();
if (zk != null) {
// 创建节点
createNode(zk, data);
}
}
}
/**
*
*创建zooKeeper
* @return
*/
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
LogUtils.info(logger, "创建zk,参数是:address:{0},timeout:{1}", registerAddress, timeout);
// 创建一个zooKeeper实例,第一个参数是目标服务器地址和端口,第二个参数是session 超时时间,第三个参数是节点发生变化时的回调方法
zk = new ZooKeeper(registerAddress, timeout, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
// 计数器减一
latch.countDown();
}
}
});
// 阻塞到计数器为0,直到节点的变化回调方法执行完成
latch.await();
} catch (Exception e) {
LogUtils.error(logger, "connectServer exception", e);
}
// 返回ZooKeeper实例
return zk;
}
/**
*
*
* @param zk ZooKeeper的实例
* @param data 注册数据
*/
private void createNode(ZooKeeper zk, String data) {
try {
byte[] bytes = data.getBytes();
/**
* 创建一个节点,第一个参数是该节点的路径,第二个参数是该节点的初始化数据,第三个参数是该节点的ACL,第四个参数指定节点的创建策略
*/
String createResult = zk.create(registerPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
LogUtils.info(logger, "创建的结果是:{0}", createResult);
} catch (Exception e) {
LogUtils.error(logger, "createNode exception", e);
}
}
/**
* Getter method for property <tt>timeout</tt>.
*
* @return property value of timeout
*/
public int getTimeout() {
return timeout;
}
/**
* Setter method for property <tt>timeout</tt>.
*
* @param timeout value to be assigned to property timeout
*/
public void setTimeout(int timeout) {
this.timeout = timeout;
}
/**
* Getter method for property <tt>registerPath</tt>.
*
* @return property value of registerPath
*/
public String getRegisterPath() {
return registerPath;
}
/**
* Setter method for property <tt>registerPath</tt>.
*
* @param registerPath value to be assigned to property registerPath
*/
public void setRegisterPath(String registerPath) {
this.registerPath = registerPath;
}
/**
* Getter method for property <tt>registerAddress</tt>.
*
* @return property value of registerAddress
*/
public String getRegisterAddress() {
return registerAddress;
}
/**
* Setter method for property <tt>registerAddress</tt>.
*
* @param registerAddress value to be assigned to property registerAddress
*/
public void setRegisterAddress(String registerAddress) {
this.registerAddress = registerAddress;
}
}

  至此在服务启动时就可以方便地注册到ZK
  RPC调用客户端

/**
*RPC客户端
* @author zhangwei_david
* @version $Id: RpcClient.java, v 0.1 2014年12月31日 下午9:18:34 zhangwei_david Exp $
*/
public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {
private String       host;
private int          port;
private RpcResponse  response;
private final Object obj = new Object();
public RpcClient(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
this.response = response;
synchronized (obj) {
obj.notifyAll(); // 收到响应,唤醒线程
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
public RpcResponse send(RpcRequest request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new RpcEncoder(RpcRequest.class)) // 将 RPC 请求进行编码(为了发送请求)
.addLast(new RpcDecoder(RpcResponse.class)) // 将 RPC 响应进行解码(为了处理响应)
.addLast(RpcClient.this); // 使用 RpcClient 发送 RPC 请求
}
}).option(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().writeAndFlush(request).sync();
synchronized (obj) {
obj.wait(); // 未收到响应,使线程等待
}
if (response != null) {
future.channel().closeFuture().sync();
}
return response;
} finally {
group.shutdownGracefully();
}
}
}

  RPC服务发现:

/**
*Rpc 服务发现
* @author zhangwei_david
* @version $Id: ServiceDiscovery.java, v 0.1 2014年12月31日 下午9:10:23 zhangwei_david Exp $
*/
public class ServiceDiscovery {
// 日志
private static final Logger   logger   = LogManager.getLogger(ServiceDiscovery.class);
private CountDownLatch        latch    = new CountDownLatch(1);
private volatile List<String> dataList = new ArrayList<String>();
private String                registryAddress;
public void init() {
LogUtils.debug(logger, "Rpc 服务发现初始化...");
ZooKeeper zk = connectServer();
if (zk != null) {
watchNode(zk);
}
}
public String discover() {
String data = null;
int size = dataList.size();
if (size > 0) {
if (size == 1) {
data = dataList.get(0);
} else {
data = dataList.get(ThreadLocalRandom.current().nextInt(size));
}
}
return data;
}
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constant.DEFAULT_ZK_SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (Exception e) {
}
LogUtils.debug(logger, "zk 是{0}", zk);
return zk;
}
private void watchNode(final ZooKeeper zk) {
try {
List<String> nodeList = zk.getChildren(Constant.ROOT, new Watcher() {
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
watchNode(zk);
}
}
});
LogUtils.debug(logger, "zk 节点有  {0}", nodeList);
List<String> dataList = new ArrayList<String>();
for (String node : nodeList) {
byte[] bytes = zk.getData(Constant.ROOT + node, false, null);
dataList.add(new String(bytes));
}
this.dataList = dataList;
if (dataList.isEmpty()) {
throw new RuntimeException("尚未注册任何服务");
}
} catch (Exception e) {
LogUtils.error(logger, "发现节点异常", e);
}
}
/**
* Setter method for property <tt>registryAddress</tt>.
*
* @param registryAddress value to be assigned to property registryAddress
*/
public void setRegistryAddress(String registryAddress) {
this.registryAddress = registryAddress;
}
}
  测试:

/**
*
* @author zhangwei_david
* @version $Id: HelloService.java, v 0.1 2014年12月31日 下午9:27:28 zhangwei_david Exp $
*/
public interface HelloService {
String hello();
}


/**
*
* @author zhangwei_david
* @version $Id: HelloServiceImpl.java, v 0.1 2014年12月31日 下午9:28:02 zhangwei_david Exp $
*/
@RpcService(value = "helloService", inf = HelloService.class)
public class HelloServiceImpl implements HelloService {
public String hello() {
return "Hello! ";
}
}

  服务端配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/task  
http://www.springframework.org/schema/task/spring-task-3.1.xsd  
">
<context:component-scan base-package="com.david.common.test"/>

<!-- 配置服务注册组件 -->
<bean id="serviceRegistry" class="com.david.common.rpc.registry.ServiceRegistry">
<property name="registerAddress" value="127.0.0.1:2181"/>
</bean>
<!-- 配置 RPC 服务器 -->
<bean id="rpcServer" class="com.david.common.rpc.server.RpcServer">
<property name="serverAddress" value="127.0.0.1:8000"/>
<property name="serviceRegistry" ref="serviceRegistry"/>
</bean>

</beans>

  客户端配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/task  
http://www.springframework.org/schema/task/spring-task-3.1.xsd  
">
<context:component-scan base-package="com.david.common.*"/>

<bean id="serviceDiscovery" class="com.david.common.rpc.discovery.ServiceDiscovery" init-method="init">
<property name="registryAddress" value="127.0.0.1:2181"/>
</bean>
<!-- 配置 RPC 代理 -->
<bean id="rpcProxy" class="com.david.common.rpc.proxy.RpcProxyFactory">
<property name="serviceDiscovery" ref="serviceDiscovery"/>
</bean>
</beans>

  服务端:

/**
*
* @author zhangwei_david
* @version $Id: Server.java, v 0.1 2014年12月31日 下午9:56:37 zhangwei_david Exp $
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring.xml")
public class Server {
@Test
public void helloTest() throws InterruptedException {
System.out.println("启动");
TimeUnit.HOURS.sleep(1);
}
}

  客户端:

/**
*
* @author zhangwei_david
* @version $Id: MyTest.java, v 0.1 2014年12月31日 下午9:25:49 zhangwei_david Exp $
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:client.xml")
public class HelloServiceTest {
@Autowired
private RpcProxyFactory rpcProxy;
@Test
public void helloTest() {
HelloService helloService = rpcProxy.create(HelloService.class);
String result = helloService.hello();
Assert.assertEquals("Hello! ", result);
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366427-1-1.html 上篇帖子: 架构师之dubbo-------dubbo+spring+zookeeper整合之最小系统 下篇帖子: Zookeeper 3.3.3 Transaction Logs&Snapshot 序列化
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表