nstrio.blogg.se

Kafka remove duplicate messages
Kafka remove duplicate messages













kafka remove duplicate messages
  1. #Kafka remove duplicate messages how to#
  2. #Kafka remove duplicate messages code#

#Kafka remove duplicate messages how to#

A variant/base of his proposed solution is also included in confluent’s examples and they also have a tutorial on the same: How to find distinct values in a stream of events. Var streams = new KafkaStreams(topology, topologyProps) īut these settings are global so apply to all DSL topologies, turning off any optimising that can be gained elsewhere in your streams application.Īfter searching for a better solution (and now it seems to be the only one I can find) I came across this post by Jaroslaw Kijanowski. TopologyProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1) TopologyProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1) To turn of the record caching and get the aggregator to behave how the implementation expected you can set the following properties on the Kafka Streams instance: Aggregate & Filter Workaround var topologyProps = new Properties() This makes sense if your doing a max, min or sum aggregation because what you care about is the most recent total at this moment in time, not the intermediate values (in most cases that is). Into this, which gets passed onto the filter:Īs it caches the output of the aggregation, combine results #2 & #3 into the last event received which is #3. If you receive the following events: “A”, then a long enough gap then “B” followed by “B” within the commit interval and within the buffer limit, then record caching would turn the output of the aggregator: The problem with this solution is the record caching. filter(Pair::getRight) // only allow true = changed values

kafka remove duplicate messages

Return Pair.of(aggregate.getLeft(), false) If (aggregate.getLeft() != null & aggregate.getLeft().equals(newValue))

#Kafka remove duplicate messages code#

The code roughly looks like: builder.stream("my-input-topic") Var filtered = stream.filter(pair –> cond)

  • The next step in the chain is to filter out unchanged values:.
  • If you determine it hasn’t changed return.
  • when the next value is received compare the previous response ( previousPair.first) with newValue,.
  • when the first value is received return something like new Pair(value, true).
  • aggregate with an initial value of an empty wrapper (used to flag the returned value as changed or not).
  • Effectively doing the same as log compacting client side – before being written to any topic. The streams DSL, performs optimisations on aggregations by caching the output before sending downstream and also caching it before writing to the state store and the changelog topic it’s persisted to. The “right way” is a bit subjective, so to quell the inevitable quibbles, let’s settle for the “deduping messages in Kafka Streams: not the wrong way”. The problem with this method, is because this wasn’t the intent of the Kafka Streams DSL’s groupBy & aggregate (which is performing aggregations), various features need to be turned off or worked around to prevent the ‘changed’ events being lost by the DSL’s optimisations. This seemed like a creative way of leveraging the DSL functionality. By wrong way I mean they used a groupByKey & aggregate to compare previous/current values and then filter out the unchanged values. Most examples I found out in the wild of how to deduplicate identical or unchanged messages, I’ve recently discovered, do it the wrong way.















    Kafka remove duplicate messages