namedhao 发表于 2016-12-19 07:45:46

5.Redis消息订阅/发布

  Redis可以很容的实现消息订阅/发布功能

一.JedisPubSub

  需要实现一个JedisPubSub,相当于Redis消息的Listener

package com.gqshao.redis.channels;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPubSub;
public class MyJedisPubSub extends JedisPubSub {
protected static Logger logger = LoggerFactory.getLogger(MyJedisPubSub.class);
// 取得订阅的消息后的处理
public void onMessage(String channel, String message) {
logger.info("取得订阅的消息后的处理 : " + channel + "=" + message);
}
// 初始化订阅时候的处理
public void onSubscribe(String channel, int subscribedChannels) {
logger.info("初始化订阅时候的处理 : " + channel + "=" + subscribedChannels);
}
// 取消订阅时候的处理
public void onUnsubscribe(String channel, int subscribedChannels) {
logger.info("取消订阅时候的处理 : " + channel + "=" + subscribedChannels);
}
// 初始化按表达式的方式订阅时候的处理
public void onPSubscribe(String pattern, int subscribedChannels) {
logger.info("初始化按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels);
}
// 取消按表达式的方式订阅时候的处理
public void onPUnsubscribe(String pattern, int subscribedChannels) {
logger.info(" 取消按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels);
}
// 取得按表达式的方式订阅的消息后的处理
public void onPMessage(String pattern, String channel, String message) {
logger.info("取得按表达式的方式订阅的消息后的处理 :" + pattern + "=" + channel + "=" + message);
}
}


二.消息订阅/发布
  1.消息的订阅需要一个Redis连接始终保持连接,Jedis中停止订阅的unsubscribe是在JedisPubSub中
  2.程序中因为需要Jedis始终保持连接,又有可能需要停止订阅,所以用到了ExecutorService

package com.gqshao.redis.channels;

import com.gqshao.redis.JedisTest;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 发布/订阅
*/
public class MessageTest extends JedisTest {
/**
* SUBSCRIBE 订阅一个匹配的通道
* PSUBSCRIBE 订阅匹配的通道
* PUBLISH 将value推送到channelone通道中
* UNSUBSCRIBE 取消订阅消息
* PUNSUBSCRIBE 取消匹配的消息订阅
* web环境中可以编写一个JedisPubSub 继承 @see redis.clients.jedis.JedisPubSub来实现监听
* Jedis中通过使用 JedisPubSub.UNSUBSCRIBE/PUNSUBSCRIBE 来取消订阅
*/
@Test
public void testSubscribe() {
final MyJedisPubSub listener = new MyJedisPubSub();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
logger.info("subscribe channelA.test channelB.send_message");
jedis.subscribe(listener, "channelA.test", "channelB.send_message");
}
});
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(thread);
// 测试发送
Jedis pubJedis = pool.getResource();
logger.info("publish channelA.test OK : " + pubJedis.publish("channelA.test", "OK"));
logger.info("publish channelB.send_message \"Hello World!\" : " + pubJedis.publish("channelB.send_message", "Hello World!"));
listener.unsubscribe("channelA.test", "channelB.send_message");
try {
executor.shutdownNow();
logger.info("executor.shutdownNow");
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
logger.warn("Pool did not terminated");
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
logger.info("完成subscribe测试");
}

/**
* SUBSCRIBE channelone 订阅一个通道
* PSUBSCRIBE channel* 订阅一批通道
* PUBLISH channelone value 将value推送到channelone通道中
* web环境中可以编写一个Listener 继承 @see redis.clients.jedis.JedisPubSub来实现监听
*/
@Test
public void testPsubscribe() {
final MyJedisPubSub listener = new MyJedisPubSub();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
logger.info("psubscribe channel*");
jedis.psubscribe(listener, "channel*");
}
});
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(thread);
// 测试发送
Jedis pubJedis = pool.getResource();
logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK"));
logger.info("publish channelB.send_message \"Hello World!\"" + pubJedis.publish("channelB.send_message", "Hello World!"));
pool.returnResource(pubJedis);
listener.punsubscribe();
try {
executor.shutdownNow();
logger.info("executor.shutdownNow");
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
logger.warn("Pool did not terminated");
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
logger.info("完成psubscribe测试");
logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK"));
}
}
页: [1]
查看完整版本: 5.Redis消息订阅/发布