2 years ago
#68356

Saher
Two separate Kafka Consumer Configs and Listeners
I wrote a Kafka Producer & Consumer some time ago, and now I need to add another consumer to the application, but I'm having issues with the configurations of it.
config class 1
@Configuration
@EnableKafka
public class consumerConfig1 {
@Bean
public Map<String, Object> firstConsumerConfigs(Environment env) {
Map<String, Object> kafkaProps = new HashMap<>();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("my-app.kafka.bootstrap-servers", ""));
kafkaProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getProperty("my-app.kafka.properties.schema.registry.url", ""));
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, env.getProperty("my-app.kafka.consumer.key-deserializer", ""));
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, env.getProperty("my-app.kafka.consumer.value-deserializer", ""));
kafkaProps.put("group.id", env.getProperty("my-app.kafka.consumer.group-id", ""));
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
kafkaProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "600000");
kafkaProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty("my-app.kafka.properties.security.protocol", ""));
kafkaProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, env.getProperty("my-app.kafka.ssl.trust-store-location", "")
.replace("file://", "").replace("file:", ""));
kafkaProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, env.getProperty("my-app.kafka.ssl.trust-store-password", ""));
kafkaProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, env.getProperty("my-app.kafka.ssl.key-store-location", "")
.replace("file://", "").replace("file:", ""));
kafkaProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, env.getProperty("my-app.kafka.ssl.key-store-password", ""));
kafkaProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, env.getProperty("my-app.kafka.ssl.key-password", ""));
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("my-app.kafka.consumer.group-id", ""));
kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, env.getProperty("my-app.kafka.client-id", ""));
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("my-app.kafka.consumer.auto-offset-reset", ""));
// For the IDE to work
System.setProperty("javax.net.ssl.trustStore", env.getProperty("my-app.kafka.ssl.trust-store-location", "")
.replace("file://", "").replace("file:", ""));
System.setProperty("javax.net.ssl.trustStorePassword", env.getProperty("my-app.kafka.ssl.trust-store-password", ""));
System.setProperty("javax.net.ssl.keyStore", env.getProperty("my-app.kafka.ssl.key-store-location", "")
.replace("file://", "").replace("file:", ""));
System.setProperty("javax.net.ssl.keyStorePassword", env.getProperty("my-app.kafka.ssl.key-store-password", ""));
LogUtil.logInfo("", "","Kafka Consumer Config" , "APP CONFIGURATIONS", "", "Kafka Consumer 1 Ingest Consumer Configurations ===> " + kafkaProps.toString());
return kafkaProps;
}
@Bean
public firstConsumerFactory<GenericRecord, GenericRecord> ingestConsumerFactory(Environment env){
return new DefaultKafkaConsumerFactory<>(firstConsumerConfigs(env));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<GenericRecord, GenericRecord>> firstListenerContainerFactory(Environment env) {
return firstListenerContainerFactory(env);
}
@Bean(name = "firstListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> firstListenerContainerFactory(Environment env) {
ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(firstListenerContainerFactory(env));
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(1);
return factory;
}
@Bean
public FirstReceiver receiver() {
return new FirstReceiver();
}
}
Consumer Receiver Class
@Service
public class FirstReceiver {
@KafkaListener(topics = "${my-app.kafka.template.first-topic}", groupId = "${my-app.kafka.consumer.group-id}", containerFactory = "firstListenerContainerFactory")
public void receive(@Payload ConsumerRecord<GenericRecord, GenericRecord> record, Acknowledgment acknowledgment) {
acknowledgment.acknowledge();
//.... continue consumer logic
}
Up to this point, everything was working... configuration started to get missed-up when I tried to add a second configuration class to hold certs & configs for the second consumer
config class 2
@Configuration
@EnableKafka
public class consumerConfig2 {
@Bean
public Map<String, Object> secondConsumerConfigs(Environment env) {
Map<String, Object> kafkaProps = new HashMap<>();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("my-new-app.kafka.bootstrap-servers", ""));
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, env.getProperty("my-new-app.kafka.consumer.key-deserializer", ""));
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, env.getProperty("my-new-app.kafka.consumer.value-deserializer", ""));
kafkaProps.put("group.id", env.getProperty("my-new-app.kafka.consumer.group-id", ""));
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
kafkaProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "600000");
kafkaProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty("my-new-app.kafka.properties.security.protocol", ""));
kafkaProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, env.getProperty("my-new-app.kafka.ssl.trust-store-location", "")
.replace("file://", "").replace("file:", ""));
kafkaProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, env.getProperty("my-new-app.kafka.ssl.trust-store-password", ""));
kafkaProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, env.getProperty("my-new-app.kafka.ssl.key-store-location", "")
.replace("file://", "").replace("file:", ""));
kafkaProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, env.getProperty("my-new-app.kafka.ssl.key-store-password", ""));
kafkaProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, env.getProperty("my-new-app.kafka.ssl.key-password", ""));
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("my-new-app.kafka.consumer.group-id", ""));
kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, env.getProperty("my-new-app.kafka.client-id", ""));
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("my-new-app.kafka.consumer.auto-offset-reset", ""));
// For the IDE to work
System.setProperty("javax.net.ssl.trustStore", env.getProperty("my-new-app.kafka.ssl.trust-store-location", "")
.replace("file://", "").replace("file:", ""));
System.setProperty("javax.net.ssl.trustStorePassword", env.getProperty("my-new-app.kafka.ssl.trust-store-password", ""));
System.setProperty("javax.net.ssl.keyStore", env.getProperty("my-new-app.kafka.ssl.key-store-location", "")
.replace("file://", "").replace("file:", ""));
System.setProperty("javax.net.ssl.keyStorePassword", env.getProperty("my-new-app.kafka.ssl.key-store-password", ""));
LogUtil.logInfo("", "","Kafka Consumer Config" , "APP CONFIGURATIONS", "", "Kafka Consumer 2 Ingest Consumer Configurations ===> " + kafkaProps.toString());
return kafkaProps;
}
@Bean
public secondConsumerFactory<GenericRecord, GenericRecord> ingestConsumerFactory(Environment env){
return new DefaultKafkaConsumerFactory<>(secondConsumerConfigs(env));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> secondListenerContainerFactory(Environment env) {
return secondListenerContainerFactory(env);
}
@Bean(name = "secondListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> secondListenerContainerFactory(Environment env) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(secondListenerContainerFactory(env));
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(1);
return factory;
}
@Bean
public SecondReceiver receiver2() {
return new secondReceiver();
}
}
consumer class is very similar to the previous one
@Service
public class SecondReceiver {
@KafkaListener(topics = "${my-new-app.kafka.template.first-topic}", groupId = "${my--new-app.kafka.consumer.group-id}", containerFactory = "secondListenerContainerFactory")
public void receive(@Payload ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
acknowledgment.acknowledge();
//.... continue consumer logic
}
Adding the second configuration, mess up with the certs of the first configuration & now I'm unauthorized to access the first topic & also I'm still failing to connect to the second topic with an unauthorized to access group error
Also, how would I put both configuration for the IDE (System.setProperties()) to configure with.... I can't have both loaded, right?
I've been trying to get my configuration in order & failing for past couple of days. any help would be greatly appreciated.
java
spring-boot
apache-kafka
kafka-consumer-api
spring-kafka
0 Answers
Your Answer