1 year ago

#70839

test-img

MatthiasN

BeanInitializationException with NPE during initialization when branching with Kafka Streams

I am currently working on an application using Spring Cloud Stream framework and I'm having troubles upgrading kafka deprecated method KStream.branch() to KStream.split(). I used to seperate my KStream input in several KStreams using branch() with predicates and then apply several business logic to these streams but now I encounter a BeanInitializationException during the initialization of my tests such as :

15:31:48.425 [main] WARN  o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaStreamsFunctionProcessorInvoker': Invocation of init method failed; nested exception is org.springframework.beans.factory.BeanInitializationException: Cannot setup function invoker for this Kafka Streams function.; nested exception is java.lang.NullPointerException: Cannot invoke "org.apache.kafka.streams.kstream.KStream.toTable()" because the return value of "java.util.Map.get(Object)" is null
15:31:48.465 [main] ERROR o.s.b.SpringApplication - Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaStreamsFunctionProcessorInvoker': Invocation of init method failed; nested exception is org.springframework.beans.factory.BeanInitializationException: Cannot setup function invoker for this Kafka Streams function.; nested exception is java.lang.NullPointerException: Cannot invoke "org.apache.kafka.streams.kstream.KStream.toTable()" because the return value of "java.util.Map.get(Object)" is null
                at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:160)
                at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:440)
                at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1796)
                at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620)
                at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542)
                at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
                at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
                at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333)
                at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208)
                at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:953)
                at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918)
                at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583)
                at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145)
                at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730)
                at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412)
                at org.springframework.boot.SpringApplication.run(SpringApplication.java:302)
                at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:164)

Here is a simplified version of my code :

@Bean
public BiFunction<
        KStream<Integer, Integer>,
        KTable<String, String>,
        KStream<Integer, Integer>>
    process() {
        return (inputA, inputB) -> {
            
            Map<String, KStream<Integer, Integer>> mapInputA = inputA
                .split()
                .branch((key, value) -> value > 50,     // Some business predicates
                    Branched.as("branchOver50"))
                .defaultBranch(Branched.as("branchUnder50"));
            
            mapInputA
                .get("branchOver50")
                .toTable(                     // <- BeanCreationException (NPE : map.get() is null)
                    Named.as("KTable_Over_50"),
                    Materialized.with(Serdes.Integer(), Serdes.Integer())
                ).join(
                    inputB,
                    ...

From my understanding, I encounter a NullPointerException during initialization because the map is empty since there is no data yet, but since the deprecated method KStream.branch() didn't require any action to handle empty streams, I was wondering if this was an intended behavior of the new method KStream.split() or if I was doing something wrong.

The error happens during a JUnit test using an EmbeddedKafkaBroker to simulate the Kafka environment (I am using kafka-streams version 3.0.0).

apache-kafka

apache-kafka-streams

spring-cloud-stream

0 Answers

Your Answer

Accepted video resources