2 years ago
#59777
Sodrul Amin Shaon
How to create a Pub/Sub module in redis-cluster using Jedis Library 4.0.0
How can I create and test publish/subscribe module using updated Jedis library? Previously I was using jedis library version 3.7.0, from that version my code was something like.
Subscriber Process
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPubSub;
public class ChannelSubscribe {
public static void main(String[] args) {
JedisCluster jedis = null;
try {
/* Creating JedisCluster object for connecting with redis-cluster server */
jedis = new JedisCluster();
/* Creating JedisPubSub object for subscribing with channels */
JedisPubSub jedisPubSub = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
System.out.println("Channel " + channel + " has sent a message : " + message );
if(channel.equals("C1")) {
/* Unsubscribe from channel C1 after first message is received. */
unsubscribe(channel);
}
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("Client is Subscribed to channel : "+ channel);
System.out.println("Client is Subscribed to "+ subscribedChannels + " no. of channels");
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("Client is Unsubscribed from channel : "+ channel);
System.out.println("Client is Subscribed to "+ subscribedChannels + " no. of channels");
}
};
/* Subscribing to channel C1 and C2 */
jedis.subscribe(jedisPubSub, "C1", "C2");
} catch(Exception ex) {
System.out.println("Exception : " + ex.getMessage());
}
}
}
Publisher Process
import redis.clients.jedis.JedisCluster;
public class ChannelPublish {
public static void main(String[] args) {
JedisCluster jedis = null;
try {
/* Creating JedisCluster object for connecting with redis server */
jedis = new JedisCluster();
/* Publishing message to channel C1 */
jedis.publish("C1", "First message to channel C1");
/* Publishing message to channel C2 */
jedis.publish("C2", "First message to channel C2");
/* Publishing message to channel C1 */
jedis.publish("C1", "Second message to channel C1");
/* Publishing message to channel C2 */
jedis.publish("C2", "Second message to channel C2");
} catch(Exception ex) {
System.out.println("Exception : " + ex.getMessage());
}
}
}
Now I am trying to update jedis library to latest(4.0.1) one. But in this latest library I think the subscribe method has been shifted to JedisPubSub
class or I am not sure where.
But now I can't find any working example of pub/sub module for cluster. I have so far tried,
//pool configuration
GenericObjectPoolConfig<Connection> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMinIdle(5);
poolConfig.setMaxIdle(100);
poolConfig.setMaxWait(Duration.ofMillis(5000));
//jedis configuration
JedisClientConfig jedisClientConfig = DefaultJedisClientConfig.builder()
.password(config.getRedisPassword())
.ssl(false)
.build();
//host and port
HostAndPort hostAndPort = new HostAndPort(config.getRedisHost(), config.getRedisPort());
ConnectionPool connectionPool = new ConnectionPool(hostAndPort, jedisClientConfig, poolConfig);
Connection connection = connectionPool.getResource();
//create a jedis pub sub instance
JedisPubSub pubSub = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
super.onMessage(channel, message);
}
@Override
public void onPMessage(String pattern, String channel, String message) {
super.onPMessage(pattern, channel, message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
super.onSubscribe(channel, subscribedChannels);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
super.onUnsubscribe(channel, subscribedChannels);
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
super.onPUnsubscribe(pattern, subscribedChannels);
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
super.onPSubscribe(pattern, subscribedChannels);
}
@Override
public void onPong(String pattern) {
super.onPong(pattern);
}
};
pubSub.proceed(connection, "test");
But getting error called
redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out at redis.clients.jedis.util.RedisInputStream.ensureFill(RedisInputStream.java:204) at redis.clients.jedis.util.RedisInputStream.readByte(RedisInputStream.java:42) at redis.clients.jedis.Protocol.process(Protocol.java:126) at redis.clients.jedis.Protocol.read(Protocol.java:192) at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:316) at redis.clients.jedis.Connection.getUnflushedObjectMultiBulkReply(Connection.java:282) at redis.clients.jedis.JedisPubSub.process(JedisPubSub.java:131) at redis.clients.jedis.JedisPubSub.proceed(JedisPubSub.java:124) at switchbalancetest.Test.jedisUpdateTest(Test.java:78) at switchbalancetest.Test.main(Test.java:40) Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.net.SocketInputStream.read(SocketInputStream.java:127) at redis.clients.jedis.util.RedisInputStream.ensureFill(RedisInputStream.java:198) ... 9 more
And in redis server log I found error that says,
Bad message length or signature received from Cluster bus.
In current library, I thought subscribe method should be available under UnifiedJedis Class. But, I can't find it there. How can I successfully do this ?
java
redis
jedis
redis-cluster
0 Answers
Your Answer