2 years ago

#74544

test-img

Palamariuk Maksym

Kafka Streams Lost Produced Messages

I'm using Kafka Streams with Spring Boot and after some aggregations I produce data to a new topic. When I consume records (with kafka-console-consumer) from this topic at the first time all messages present (nearly 52k records). But at the second time this number is very different (32k, 18k or even 10k records).

I use dockerized kafka cluster with 3 nodes and replication factor 3. How can I prevent data lost?

Example of my stream code:

KStream<Long, JsonNode> st1 = KafkaUtils.readStream(builder, TOPIC_1, (k, v) -> v.get("id").asLong(), jsonSerde); 
KStream<Long, JsonNode> st2 = KafkaUtils.readStream(builder, TOPIC_2, (k, v) -> v.get("id").asLong(), jsonSerde); 

st2
.join(st1, valueJoiner, JoinWindows.of(Duration.ofHours(1)), StreamJoined.with(Serdes.Long(), jsonSerde, jsonSerde))
.transform(CustomTimestampTransformer<Long>::new) //My custom transformer for extracting timestamps from record 
.to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), jsonSerde));

apache-kafka

apache-kafka-streams

0 Answers

Your Answer

Accepted video resources