构建实时消息系统
Redis的Pub/Sub系统可以构建实时的消息系统,比如很多开发人员用Pub/Sub构建实时聊天系统。
java
import redis.clients.jedis.*;
import java.util.Date;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.RandomStringUtils;
class PrintListener extends JedisPubSub{
@Override
public void onMessage(String channel, String message) {
String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");
System.out.println("message receive:" + message + ",channel:" + channel +
"..." + time);
//此处我们可以取消订阅
if(message.equalsIgnoreCase("quit")){
this.unsubscribe(channel);
}
}
}
class PubClient {
private Jedis jedis;
public PubClient(String host,int port){
jedis = new Jedis(host,port);
}
public void pub(String channel,String message){
jedis.publish(channel, message);
}
public void close(String channel){
jedis.publish(channel, "quit");
jedis.del(channel);//实时消息系统
}
}
class SubClient {
private Jedis jedis;//
public SubClient(String host,int port){
jedis = new Jedis(host,port);
}
public void sub(JedisPubSub listener,String channel){
jedis.subscribe(listener, channel);
//此处将会阻塞,在client代码级别为JedisPubSub在处理消息时,将会“独占”链接
//并且采取了while循环的⽅方式,侦听订阅的消息
}
}
public class PubSubTest {
/**
* @param args
*/
static String host = "127.0.0.1";
static int port = 10011;
public static void main(String[] args) throws Exception{
PubClient pubClient = new PubClient(host,port);
final String channel = "pubsub-channel";
pubClient.pub(channel, "before1");
pubClient.pub(channel, "before2");
Thread.sleep(2000);
//消息订阅者⾮非常特殊,需要独占链接,因此我们需要为它创建新的链接;
//此外,jedis客户端的实现也保证了“链接独占”的特性,sub⽅方法将⼀一直阻塞,
//直到调⽤用listener.unsubscribe⽅方法
Thread subThread = new Thread(new Runnable() {
@Override
public void run() {
try{
SubClient subClient = new SubClient(host,port);
System.out.println("----------subscribe operation begin-------");
JedisPubSub listener = new PrintListener();
//在API级别,此处为轮询操作,直到unsubscribe调⽤用,才会返回
subClient.sub(listener, channel);
System.out.println("----------subscribe operation end-------")
;
}catch(Exception e){
e.printStackTrace();
}
}
});
subThread.start();
int i=0;
while(i < 10){
String message = RandomStringUtils.random(6, true, true);//apache-commons
pubClient.pub(channel, message);
i++;
Thread.sleep(1000);
}
//被动关闭指示,如果通道中,消息发布者确定通道需要关闭,那么就发送一个“quit”
//那么在listener.onMessage()中接收到“quit”时,其他订阅client将执行“unsubscribe”操作。
pubClient.close(channel);
//此外,你还可以这样取消订阅
//listener.unsubscribe(channel);
}
}
输出:
----------subscribe operation begin------- message receive:erRIEe,channel:pubsub-channel...2016-03-15 15:53:52 message receive:Ovcwiw,channel:pubsub-channel...2016-03-15 15:53:53 message receive:STPWfV,channel:pubsub-channel...2016-03-15 15:53:54 message receive:SR4iIk,channel:pubsub-channel...2016-03-15 15:53:55 message receive:GI3Ege,channel:pubsub-channel...2016-03-15 15:53:56 message receive:0V1JUt,channel:pubsub-channel...2016-03-15 15:53:57 message receive:3iU8BV,channel:pubsub-channel...2016-03-15 15:53:58 message receive:BqeI2x,channel:pubsub-channel...2016-03-15 15:53:59 message receive:D53cHF,channel:pubsub-channel...2016-03-15 15:54:00 message receive:quit,channel:pubsub-channel...2016-03-15 15:54:01 ----------subscribe operation end-------