2 years ago

#73968

test-img

Aditya Chandla

Stateful Aggregations with Apache Kafka Streams

I want to use Kafka Streams to perform stateful aggregations. The use-case is that we are getting a stream of events, which we want to store until we get a specific event, at which point we will perform some computation on all the collected events and output a stream of objects with the aggregated information.

@Autowired
public void aggregations(KStreamsBuilder streamBuilder) {
  KStream<String, CustomEvent> events = streamBuilder.stream(topicName);
  //I want to aggregate all custom events into a StateStore until I get an event of a particular type

  builder.addStateStore(Stores.keyValueStoreBuilder(
     Stores.persistentKeyValueStore("AggregationStore"),
     Serdes.String(),
     SerdeProvider.getJsonSerde(SessionEvents.class)
   ));
   
  //I want to perform computation on List<CustomEvent> and give out a KStream<String, AggregateInfo>
}

What I've tried:

  1. Using windows is not a viable option because all the events might come within a few minutes or might take as long as a couple of days.
  2. I also tried defining a Processor with state store but the streamBuilder's process method only accepts a processor of type Processor<KIn, VIn, Void, Void> but I want to output AggregateInfo when the computation is completed. https://kafka.apache.org/30/javadoc/org/apache/kafka/streams/kstream/KStream.html#process(org.apache.kafka.streams.processor.api.ProcessorSupplier,java.lang.String...)

Apache beam has a state and timers API in which I was able to handle this scenario. Do Kafka Streams also offer something similar to that? Or how should we handle this case with Kafka Streams API ?

apache-kafka

apache-kafka-streams

spring-kafka

0 Answers

Your Answer

Accepted video resources