2 years ago

#68356

test-img

Saher

Two separate Kafka Consumer Configs and Listeners

I wrote a Kafka Producer & Consumer some time ago, and now I need to add another consumer to the application, but I'm having issues with the configurations of it.

config class 1

@Configuration
@EnableKafka
public class consumerConfig1 {
@Bean
    public Map<String, Object> firstConsumerConfigs(Environment env) {
        Map<String, Object> kafkaProps = new HashMap<>();

        kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("my-app.kafka.bootstrap-servers", ""));
        kafkaProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getProperty("my-app.kafka.properties.schema.registry.url", ""));
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, env.getProperty("my-app.kafka.consumer.key-deserializer", ""));
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, env.getProperty("my-app.kafka.consumer.value-deserializer", ""));
        kafkaProps.put("group.id", env.getProperty("my-app.kafka.consumer.group-id", ""));
        kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
        kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        kafkaProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "600000");

        kafkaProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty("my-app.kafka.properties.security.protocol", ""));
        kafkaProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, env.getProperty("my-app.kafka.ssl.trust-store-location", "")
                    .replace("file://", "").replace("file:", ""));
        kafkaProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, env.getProperty("my-app.kafka.ssl.trust-store-password", ""));

        kafkaProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, env.getProperty("my-app.kafka.ssl.key-store-location", "")
                    .replace("file://", "").replace("file:", ""));
        kafkaProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, env.getProperty("my-app.kafka.ssl.key-store-password", ""));
        kafkaProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, env.getProperty("my-app.kafka.ssl.key-password", ""));
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("my-app.kafka.consumer.group-id", ""));
        kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, env.getProperty("my-app.kafka.client-id", ""));
        kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("my-app.kafka.consumer.auto-offset-reset", ""));

        // For the IDE to work
        System.setProperty("javax.net.ssl.trustStore", env.getProperty("my-app.kafka.ssl.trust-store-location", "")
                    .replace("file://", "").replace("file:", ""));

        System.setProperty("javax.net.ssl.trustStorePassword", env.getProperty("my-app.kafka.ssl.trust-store-password", ""));

        System.setProperty("javax.net.ssl.keyStore", env.getProperty("my-app.kafka.ssl.key-store-location", "")
                    .replace("file://", "").replace("file:", ""));

        System.setProperty("javax.net.ssl.keyStorePassword", env.getProperty("my-app.kafka.ssl.key-store-password", ""));

        LogUtil.logInfo("", "","Kafka Consumer Config" , "APP CONFIGURATIONS", "", "Kafka Consumer 1 Ingest Consumer Configurations ===> " + kafkaProps.toString());

        return kafkaProps;
    }

    @Bean
    public firstConsumerFactory<GenericRecord, GenericRecord> ingestConsumerFactory(Environment env){
        return new DefaultKafkaConsumerFactory<>(firstConsumerConfigs(env));
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<GenericRecord, GenericRecord>> firstListenerContainerFactory(Environment env) {
        return firstListenerContainerFactory(env);
    }

    @Bean(name = "firstListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> firstListenerContainerFactory(Environment env) {
        ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(firstListenerContainerFactory(env));
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConcurrency(1);

        return factory;
    }

    @Bean
    public FirstReceiver receiver() {
        return new FirstReceiver();
    }
}

Consumer Receiver Class

@Service
public class FirstReceiver {

    @KafkaListener(topics = "${my-app.kafka.template.first-topic}", groupId = "${my-app.kafka.consumer.group-id}", containerFactory = "firstListenerContainerFactory")
    public void receive(@Payload ConsumerRecord<GenericRecord, GenericRecord> record, Acknowledgment acknowledgment) {
    acknowledgment.acknowledge();
    //.... continue consumer logic
}

Up to this point, everything was working... configuration started to get missed-up when I tried to add a second configuration class to hold certs & configs for the second consumer

config class 2

@Configuration
@EnableKafka
public class consumerConfig2 {
@Bean
    public Map<String, Object> secondConsumerConfigs(Environment env) {
        Map<String, Object> kafkaProps = new HashMap<>();

        kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("my-new-app.kafka.bootstrap-servers", ""));
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, env.getProperty("my-new-app.kafka.consumer.key-deserializer", ""));
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, env.getProperty("my-new-app.kafka.consumer.value-deserializer", ""));
        kafkaProps.put("group.id", env.getProperty("my-new-app.kafka.consumer.group-id", ""));
        kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
        kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        kafkaProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "600000");

        kafkaProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty("my-new-app.kafka.properties.security.protocol", ""));
        kafkaProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, env.getProperty("my-new-app.kafka.ssl.trust-store-location", "")
                    .replace("file://", "").replace("file:", ""));
        kafkaProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, env.getProperty("my-new-app.kafka.ssl.trust-store-password", ""));

        kafkaProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, env.getProperty("my-new-app.kafka.ssl.key-store-location", "")
                    .replace("file://", "").replace("file:", ""));
        kafkaProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, env.getProperty("my-new-app.kafka.ssl.key-store-password", ""));
        kafkaProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, env.getProperty("my-new-app.kafka.ssl.key-password", ""));
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("my-new-app.kafka.consumer.group-id", ""));
        kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, env.getProperty("my-new-app.kafka.client-id", ""));
        kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("my-new-app.kafka.consumer.auto-offset-reset", ""));

        // For the IDE to work
        System.setProperty("javax.net.ssl.trustStore", env.getProperty("my-new-app.kafka.ssl.trust-store-location", "")
                    .replace("file://", "").replace("file:", ""));

        System.setProperty("javax.net.ssl.trustStorePassword", env.getProperty("my-new-app.kafka.ssl.trust-store-password", ""));

        System.setProperty("javax.net.ssl.keyStore", env.getProperty("my-new-app.kafka.ssl.key-store-location", "")
                    .replace("file://", "").replace("file:", ""));

        System.setProperty("javax.net.ssl.keyStorePassword", env.getProperty("my-new-app.kafka.ssl.key-store-password", ""));

        LogUtil.logInfo("", "","Kafka Consumer Config" , "APP CONFIGURATIONS", "", "Kafka Consumer 2 Ingest Consumer Configurations ===> " + kafkaProps.toString());

        return kafkaProps;
    }

    @Bean
    public secondConsumerFactory<GenericRecord, GenericRecord> ingestConsumerFactory(Environment env){
        return new DefaultKafkaConsumerFactory<>(secondConsumerConfigs(env));
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> secondListenerContainerFactory(Environment env) {
        return secondListenerContainerFactory(env);
    }

    @Bean(name = "secondListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> secondListenerContainerFactory(Environment env) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(secondListenerContainerFactory(env));
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConcurrency(1);

        return factory;
    }

    @Bean
    public SecondReceiver receiver2() {
        return new secondReceiver();
    }
}

consumer class is very similar to the previous one

@Service
public class SecondReceiver {

    @KafkaListener(topics = "${my-new-app.kafka.template.first-topic}", groupId = "${my--new-app.kafka.consumer.group-id}", containerFactory = "secondListenerContainerFactory")
    public void receive(@Payload ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
    acknowledgment.acknowledge();
    //.... continue consumer logic
}

Adding the second configuration, mess up with the certs of the first configuration & now I'm unauthorized to access the first topic & also I'm still failing to connect to the second topic with an unauthorized to access group error

Also, how would I put both configuration for the IDE (System.setProperties()) to configure with.... I can't have both loaded, right?

I've been trying to get my configuration in order & failing for past couple of days. any help would be greatly appreciated.

java

spring-boot

apache-kafka

kafka-consumer-api

spring-kafka

0 Answers

Your Answer

Accepted video resources