kestrel利用dubbo和memcached协议实现 队列服务
1.解压安装,kestrel-2.3.4.tar.gz编写启动脚本,并启动。
start_kestrel.sh:
#设置工作目录
WOKR_DIR=.
#设置lib库目录
EXTEN_LIB_HOME=$WOKR_DIR/lib
#设置执行类库目录
CLASS_DIR=kestrel_2.9.1-2.3.4.jar
#设置path路径分隔符
PATH_SPLIT=:
#设置java路径
JAVA_HOME=/usr/java/jdk1.6.0_38
JAVA=$JAVA_HOME/bin/java
#设置classpath
CLASSPATH=$CLASSPATH$PATH_SPLIT$CLASS_DIR
for i in $EXTEN_LIB_HOME/*.*; do
CLASSPATH="$CLASSPATH":"$i"
done
export CLASSPATH
#设置内存
#JAVA_OPTS="$JAVA_OPTS -server -Xmx2048m -agentpath:/home/soft/yjp-8.0.30/bin/linux-x86-32/libyjpagent.so"
JAVA_OPTS="$JAVA_OPTS -server -Xmx2048m"
echo
echo JDK is $JAVA
echo
echo CLASSPATH is $CLASSPATH
echo
echo MAINCLASS is $BEAN
echo
#执行
$JAVA -jar kestrel_2.9.1-2.3.4.jar &
2.安装memcached-1.4.24.tar.gz
启动memcached :
memcached -p 12677 -U 0 -d -r -u root -m 2040 -c 1024 -t 4
3.安装zookeeper-3.4.3(这里做了伪集群,安装步骤请查找相关资料)启动zookeeper
4.安装dubbo Web管理界面(官网下载dubbo-admin-tomcat.tar.gz)解压后,修改ROOT/WEB-INF/dubbo.properties.,并启动tomcat
dubbo.registry.address=zookeeper://X.X.X.X:2181(zookeeper服务地址)
dubbo.admin.root.password=root
dubbo.admin.guest.password=guest
5.编写队列接口类:
public abstract interface Queue extends java.util.Queue
{
public abstract boolean add(String paramString, Object paramObject);
public abstract Object element(String paramString);
public abstract boolean offer(String paramString, Object paramObject);
public abstract Object peek(String paramString);
public abstract Object poll(String paramString);
public abstract Object remove(String paramString);
public abstract void setServers(String[] paramArrayOfString);
public abstract int size(String paramString);
public abstract void init();
public abstract void destroy();
}
6.编写实现类
import java.util.Collection;
import java.util.Iterator;
import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.MonitorConfig;
import com.alibaba.dubbo.config.ReferenceConfig;
import com.alibaba.dubbo.config.RegistryConfig;
public class QueueImplimplements Queue
{
public QueueImpl() {
super();
}
private String[] servers = new String;
private ReferenceConfig<Queue> reference = null;
public void setServers(String[] servers)
{
if (servers == null)
this.servers = new String;
else
this.servers = servers;
}
public void destroy()
{
if (this.reference != null) {
this.reference.destroy();
this.reference = null;
}
}
public void init()
{
ApplicationConfig application = new ApplicationConfig();
application.setName("queue");
RegistryConfig registry = new RegistryConfig();
String zookeeperAddress = "zookeeper://";
for (int i = 0; i < this.servers.length; i++) {
zookeeperAddress = zookeeperAddress + this.servers;
if ((this.servers.length > 1) && (i == 0)) {
zookeeperAddress = zookeeperAddress + "?backup=";
}
if ((i < this.servers.length - 1) && (i > 0)) {
zookeeperAddress = zookeeperAddress + ",";
}
}
MonitorConfig monitor = new MonitorConfig();
monitor.setProtocol("registry");//监控中心协议,如果为protocol="registry",表示从注册中心发现监控中心地址,否则直连监控中心
registry.setAddress(zookeeperAddress);//注册中心服务器地址
this.reference = new ReferenceConfig(); //用于创建一个远程服务代理,可以像使用本地bean一样使用demoService
this.reference.setApplication(application);
this.reference.setRegistry(registry);//设置注册中心
this.reference.setMonitor(monitor);
this.reference.setInterface(Queue.class);//
}
public boolean add(String queueName, Object e)
{
Queue queue = (Queue)this.reference.get();
return queue.add(queueName, e);
}
public Object element(String queueName)
{
Queue queue = (Queue)this.reference.get();
return queue.element(queueName);
}
public boolean offer(String queueName, Object e)
{
Queue queue = (Queue)this.reference.get();
return queue.offer(queueName, e);
}
public Object peek(String queueName)
{
Queue queue = (Queue)this.reference.get();
return queue.peek(queueName);
}
public Object poll(String queueName)
{
Queue queue = (Queue)this.reference.get();
return queue.poll(queueName);
}
public Object remove(String queueName)
{
Queue queue = (Queue)this.reference.get();
return queue.remove(queueName);
}
public boolean add(Object e)
{
return false;
}
public Object element()
{
return null;
}
public boolean offer(Object e)
{
return false;
}
public Object peek()
{
return null;
}
public Object poll()
{
return null;
}
public Object remove()
{
return null;
}
public boolean addAll(Collection c)
{
return false;
}
public void clear()
{
}
public boolean contains(Object o)
{
return false;
}
public boolean containsAll(Collection c)
{
return false;
}
public boolean isEmpty()
{
return false;
}
public Iterator iterator()
{
return null;
}
public boolean remove(Object o)
{
return false;
}
public boolean removeAll(Collection c)
{
return false;
}
public boolean retainAll(Collection c)
{
return false;
}
public int size()
{
return 0;
}
public Object[] toArray()
{
return null;
}
public Object[] toArray(Object[] a)
{
return null;
}
public int size(String queueName)
{
Queue queue = (Queue)this.reference.get();
return queue.size(queueName);
}
}
7.配置dubbo服务
1)
服务名:x.x.x.Queue
2)
服务地址:memcached://X.X.X.X:22133/x.x.x.Queue?application=zqueue&dynamic=false&interface=x.x.x.Queue&loadbalance=consistenthash
3)保存并启用
8:测试
public class QueueTestMian {
public static void main(String[] args) {
QueueImpl queue = new QueueImpl();
queue.setServers(new String[] { "x.x.x.x:2181",
"x.x.x.x:2182", "x.x.x.x:2183"});
queue.init();
for (int i = 0; i <10; i++) {
queue.offer("test_queue1", Integer.valueOf(i));
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(queue.poll("test_queue"));
System.out.println(queue.poll("test_queue"));
System.out.println(queue.poll("test_queue"));
System.out.println(queue.poll("test_queue"));
System.out.println(queue.poll("test_queue"));
}
}
依赖jar包如下:
版权声明:本文为博主原创文章,未经博主允许不得转载。
页:
[1]