Sometimes(seems very random) Kafka sends old messages. I only want the latest messages so it overwrite messages with the same key. Currently it looks like I have multiple messages with the same key it doesn't get compacted.
I use this setting in the topic:
cleanup.policy=compact
I'm using Java/Kotlin and Apache Kafka 1.1.1 client.
Properties(8).apply {
val jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username="%s" password="%s";"
val jaasCfg = String.format(jaasTemplate, Configuration.kafkaUsername, Configuration.kafkaPassword)
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS)
put(ConsumerConfig.GROUP_ID_CONFIG,
"ApiKafkaKotlinConsumer${Configuration.kafkaGroupId}")
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer::class.java.name)
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer::class.java.name)
put("security.protocol", "SASL_SSL")
put("sasl.mechanism", "SCRAM-SHA-256")
put("sasl.jaas.config", jaasCfg)
put("max.poll.records", 100)
put("receive.buffer.bytes", 1000000)
}
Have I missed some settings?
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…