1 year ago
#70839
MatthiasN
BeanInitializationException with NPE during initialization when branching with Kafka Streams
I am currently working on an application using Spring Cloud Stream framework and I'm having troubles upgrading kafka deprecated method KStream.branch() to KStream.split(). I used to seperate my KStream input in several KStreams using branch() with predicates and then apply several business logic to these streams but now I encounter a BeanInitializationException during the initialization of my tests such as :
15:31:48.425 [main] WARN o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaStreamsFunctionProcessorInvoker': Invocation of init method failed; nested exception is org.springframework.beans.factory.BeanInitializationException: Cannot setup function invoker for this Kafka Streams function.; nested exception is java.lang.NullPointerException: Cannot invoke "org.apache.kafka.streams.kstream.KStream.toTable()" because the return value of "java.util.Map.get(Object)" is null
15:31:48.465 [main] ERROR o.s.b.SpringApplication - Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaStreamsFunctionProcessorInvoker': Invocation of init method failed; nested exception is org.springframework.beans.factory.BeanInitializationException: Cannot setup function invoker for this Kafka Streams function.; nested exception is java.lang.NullPointerException: Cannot invoke "org.apache.kafka.streams.kstream.KStream.toTable()" because the return value of "java.util.Map.get(Object)" is null
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:160)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:440)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1796)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:953)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:302)
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:164)
Here is a simplified version of my code :
@Bean
public BiFunction<
KStream<Integer, Integer>,
KTable<String, String>,
KStream<Integer, Integer>>
process() {
return (inputA, inputB) -> {
Map<String, KStream<Integer, Integer>> mapInputA = inputA
.split()
.branch((key, value) -> value > 50, // Some business predicates
Branched.as("branchOver50"))
.defaultBranch(Branched.as("branchUnder50"));
mapInputA
.get("branchOver50")
.toTable( // <- BeanCreationException (NPE : map.get() is null)
Named.as("KTable_Over_50"),
Materialized.with(Serdes.Integer(), Serdes.Integer())
).join(
inputB,
...
From my understanding, I encounter a NullPointerException during initialization because the map is empty since there is no data yet, but since the deprecated method KStream.branch() didn't require any action to handle empty streams, I was wondering if this was an intended behavior of the new method KStream.split() or if I was doing something wrong.
The error happens during a JUnit test using an EmbeddedKafkaBroker to simulate the Kafka environment (I am using kafka-streams version 3.0.0).
apache-kafka
apache-kafka-streams
spring-cloud-stream
0 Answers
Your Answer