Memcache MemcacheQ Java 实现生产者消费者
上一节我们将来MemcacheQ安装与使用,这一节我们结合Memcached的java客户端xmemcached实现生产者和消费者消息队列模型。1.定义接收消息的回调接口
package cn.slimsmart.memcache.demo.test.mq;
//接收消息回调
public interface MemcacheQCallback {
//接收消息
void receice(Object message);
}
2.实现接收消息的客户端
package cn.slimsmart.memcache.demo.test.mq;
import java.util.concurrent.TimeoutException;
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.exception.MemcachedException;
//接收消息客户端
public class MemcacheQClient implements Runnable {
private MemcachedClient memcachedClient;
private MemcacheQCallback memcacheQCallback;
private String queueName;
public MemcacheQClient() {
}
public MemcacheQClient(MemcachedClient memcachedClient, String queueName, MemcacheQCallback memcacheQCallback) {
this.memcachedClient = memcachedClient;
this.memcacheQCallback = memcacheQCallback;
this.queueName = queueName;
}
@Override
public void run() {
while (true) {
try {
Object message = memcachedClient.get(queueName);
//如果队列中已经没有数据了,休息一下再试
if(message == null){
Thread.sleep(10);
}else{
memcacheQCallback.receice(message);
}
} catch (TimeoutException e) {
} catch (InterruptedException e) {
} catch (MemcachedException e) {
}
}
}
public MemcachedClient getMemcachedClient() {
return memcachedClient;
}
public void setMemcachedClient(MemcachedClient memcachedClient) {
this.memcachedClient = memcachedClient;
}
public MemcacheQCallback getMemcacheQCallback() {
return memcacheQCallback;
}
public void setMemcacheQCallback(MemcacheQCallback memcacheQCallback) {
this.memcacheQCallback = memcacheQCallback;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
}
3.实现生产者,发送消息
package cn.slimsmart.memcache.demo.test.mq;
import net.rubyeye.xmemcached.XMemcachedClient;
//发送消息
public class Producer {
public static void main(String[] args) throws Exception {
XMemcachedClient client = new XMemcachedClient("192.168.36.189",22201);
for( int i = 0 ; i < 10 ; i++){
client.set("queue_test", 0, "hello world - "+i);
System.out.println("send message : "+"hello world - "+i);
}
}
}4.消费者
package cn.slimsmart.memcache.demo.test.mq;
import net.rubyeye.xmemcached.XMemcachedClient;
//消费
public class Consumer {
public static void main(String[] args) throws Exception {
//XMemcachedClient是线程安全的,可以被多线程使用
XMemcachedClient client = new XMemcachedClient("192.168.36.189",22201);
new Thread(new MemcacheQClient(client, "queue_test", new MemcacheQCallback() {
@Override
public void receice(Object message) {
System.out.println("接收到消息:"+message);
}
})).start();
}
}
运行看下效果吧。。。
页:
[1]