

#Kafka remove duplicate messages how to#
A variant/base of his proposed solution is also included in confluent’s examples and they also have a tutorial on the same: How to find distinct values in a stream of events. Var streams = new KafkaStreams(topology, topologyProps) īut these settings are global so apply to all DSL topologies, turning off any optimising that can be gained elsewhere in your streams application.Īfter searching for a better solution (and now it seems to be the only one I can find) I came across this post by Jaroslaw Kijanowski. TopologyProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1) TopologyProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1) To turn of the record caching and get the aggregator to behave how the implementation expected you can set the following properties on the Kafka Streams instance: Aggregate & Filter Workaround var topologyProps = new Properties() This makes sense if your doing a max, min or sum aggregation because what you care about is the most recent total at this moment in time, not the intermediate values (in most cases that is). Into this, which gets passed onto the filter:Īs it caches the output of the aggregation, combine results #2 & #3 into the last event received which is #3. If you receive the following events: “A”, then a long enough gap then “B” followed by “B” within the commit interval and within the buffer limit, then record caching would turn the output of the aggregator: The problem with this solution is the record caching. filter(Pair::getRight) // only allow true = changed values

Return Pair.of(aggregate.getLeft(), false) If (aggregate.getLeft() != null & aggregate.getLeft().equals(newValue))
#Kafka remove duplicate messages code#
The code roughly looks like: builder.stream("my-input-topic") Var filtered = stream.filter(pair –> cond)
