2 years ago
#74544
Palamariuk Maksym
Kafka Streams Lost Produced Messages
I'm using Kafka Streams with Spring Boot and after some aggregations I produce data to a new topic. When I consume records (with kafka-console-consumer) from this topic at the first time all messages present (nearly 52k records). But at the second time this number is very different (32k, 18k or even 10k records).
I use dockerized kafka cluster with 3 nodes and replication factor 3. How can I prevent data lost?
Example of my stream code:
KStream<Long, JsonNode> st1 = KafkaUtils.readStream(builder, TOPIC_1, (k, v) -> v.get("id").asLong(), jsonSerde);
KStream<Long, JsonNode> st2 = KafkaUtils.readStream(builder, TOPIC_2, (k, v) -> v.get("id").asLong(), jsonSerde);
st2
.join(st1, valueJoiner, JoinWindows.of(Duration.ofHours(1)), StreamJoined.with(Serdes.Long(), jsonSerde, jsonSerde))
.transform(CustomTimestampTransformer<Long>::new) //My custom transformer for extracting timestamps from record
.to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), jsonSerde));
apache-kafka
apache-kafka-streams
0 Answers
Your Answer