2 years ago

#59777

test-img

Sodrul Amin Shaon

How to create a Pub/Sub module in redis-cluster using Jedis Library 4.0.0

How can I create and test publish/subscribe module using updated Jedis library? Previously I was using jedis library version 3.7.0, from that version my code was something like.

Subscriber Process

import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPubSub;
 
public class ChannelSubscribe {
 
  public static void main(String[] args) {
         
    JedisCluster jedis = null;
 
    try {
             
    /* Creating JedisCluster object for connecting with redis-cluster server */
    jedis = new JedisCluster();
         
    /* Creating JedisPubSub object for subscribing with channels */
    JedisPubSub jedisPubSub = new JedisPubSub() {
                 
        @Override
        public void onMessage(String channel, String message) {
            System.out.println("Channel " + channel + " has sent a message : " + message );
            if(channel.equals("C1")) {
                /* Unsubscribe from channel C1 after first message is received. */ 
                unsubscribe(channel);
            }
        }
                 
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
            System.out.println("Client is Subscribed to channel : "+ channel);
            System.out.println("Client is Subscribed to "+ subscribedChannels + " no. of channels");
        }
             
        @Override
        public void onUnsubscribe(String channel, int subscribedChannels) {
            System.out.println("Client is Unsubscribed from channel : "+ channel);
            System.out.println("Client is Subscribed to "+ subscribedChannels + " no. of channels");
        }
             
    };
             
    /* Subscribing to channel C1 and C2 */
    jedis.subscribe(jedisPubSub, "C1", "C2");
             
    } catch(Exception ex) {         
             
        System.out.println("Exception : " + ex.getMessage());   
             
    }      
  }
}

Publisher Process

import redis.clients.jedis.JedisCluster;
 
public class ChannelPublish {
 
  public static void main(String[] args) {
         
    JedisCluster jedis = null;
         
    try {           
        /* Creating JedisCluster object for connecting with redis server */
        jedis = new JedisCluster();
             
        /* Publishing message to channel C1 */
        jedis.publish("C1", "First message to channel C1");
             
        /* Publishing message to channel C2 */
        jedis.publish("C2", "First message to channel C2");
         
        /* Publishing message to channel C1 */
        jedis.publish("C1", "Second message to channel C1");
             
        /* Publishing message to channel C2 */
        jedis.publish("C2", "Second message to channel C2");
         
    } catch(Exception ex) {         
             
        System.out.println("Exception : " + ex.getMessage());
    } 
  }
}

Now I am trying to update jedis library to latest(4.0.1) one. But in this latest library I think the subscribe method has been shifted to JedisPubSub class or I am not sure where.

But now I can't find any working example of pub/sub module for cluster. I have so far tried,

//pool configuration
        GenericObjectPoolConfig<Connection> poolConfig = new GenericObjectPoolConfig<>();
        poolConfig.setMinIdle(5);
        poolConfig.setMaxIdle(100);
        poolConfig.setMaxWait(Duration.ofMillis(5000));

        //jedis configuration
        JedisClientConfig jedisClientConfig = DefaultJedisClientConfig.builder()
                .password(config.getRedisPassword())
                .ssl(false)
                .build();

        //host and port
        HostAndPort hostAndPort = new HostAndPort(config.getRedisHost(), config.getRedisPort());

        ConnectionPool connectionPool = new ConnectionPool(hostAndPort, jedisClientConfig, poolConfig);
        Connection connection = connectionPool.getResource();

        //create a jedis pub sub instance
        JedisPubSub pubSub = new JedisPubSub() {
            @Override
            public void onMessage(String channel, String message) {
                super.onMessage(channel, message);
            }

            @Override
            public void onPMessage(String pattern, String channel, String message) {
                super.onPMessage(pattern, channel, message);
            }

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                super.onSubscribe(channel, subscribedChannels);
            }

            @Override
            public void onUnsubscribe(String channel, int subscribedChannels) {
                super.onUnsubscribe(channel, subscribedChannels);
            }

            @Override
            public void onPUnsubscribe(String pattern, int subscribedChannels) {
                super.onPUnsubscribe(pattern, subscribedChannels);
            }

            @Override
            public void onPSubscribe(String pattern, int subscribedChannels) {
                super.onPSubscribe(pattern, subscribedChannels);
            }

            @Override
            public void onPong(String pattern) {
                super.onPong(pattern);
            }
        };
        
        pubSub.proceed(connection, "test");

But getting error called

redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out at redis.clients.jedis.util.RedisInputStream.ensureFill(RedisInputStream.java:204) at redis.clients.jedis.util.RedisInputStream.readByte(RedisInputStream.java:42) at redis.clients.jedis.Protocol.process(Protocol.java:126) at redis.clients.jedis.Protocol.read(Protocol.java:192) at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:316) at redis.clients.jedis.Connection.getUnflushedObjectMultiBulkReply(Connection.java:282) at redis.clients.jedis.JedisPubSub.process(JedisPubSub.java:131) at redis.clients.jedis.JedisPubSub.proceed(JedisPubSub.java:124) at switchbalancetest.Test.jedisUpdateTest(Test.java:78) at switchbalancetest.Test.main(Test.java:40) Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.net.SocketInputStream.read(SocketInputStream.java:127) at redis.clients.jedis.util.RedisInputStream.ensureFill(RedisInputStream.java:198) ... 9 more

And in redis server log I found error that says,

Bad message length or signature received from Cluster bus.

In current library, I thought subscribe method should be available under UnifiedJedis Class. But, I can't find it there. How can I successfully do this ?

java

redis

jedis

redis-cluster

0 Answers

Your Answer

Accepted video resources