2 years ago
#73968
Aditya Chandla
Stateful Aggregations with Apache Kafka Streams
I want to use Kafka Streams to perform stateful aggregations. The use-case is that we are getting a stream of events, which we want to store until we get a specific event, at which point we will perform some computation on all the collected events and output a stream of objects with the aggregated information.
@Autowired
public void aggregations(KStreamsBuilder streamBuilder) {
KStream<String, CustomEvent> events = streamBuilder.stream(topicName);
//I want to aggregate all custom events into a StateStore until I get an event of a particular type
builder.addStateStore(Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("AggregationStore"),
Serdes.String(),
SerdeProvider.getJsonSerde(SessionEvents.class)
));
//I want to perform computation on List<CustomEvent> and give out a KStream<String, AggregateInfo>
}
What I've tried:
- Using windows is not a viable option because all the events might come within a few minutes or might take as long as a couple of days.
- I also tried defining a Processor with state store but the streamBuilder's process method only accepts a processor of type Processor<KIn, VIn, Void, Void> but I want to output AggregateInfo when the computation is completed. https://kafka.apache.org/30/javadoc/org/apache/kafka/streams/kstream/KStream.html#process(org.apache.kafka.streams.processor.api.ProcessorSupplier,java.lang.String...)
Apache beam has a state and timers API in which I was able to handle this scenario. Do Kafka Streams also offer something similar to that? Or how should we handle this case with Kafka Streams API ?
apache-kafka
apache-kafka-streams
spring-kafka
0 Answers
Your Answer