Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.0k views
in Technique[技术] by (71.8m points)

cron - Kafka Consumer getting already processed/duplicate records | Java | Spring Kafka

I am new to Kafka and writing a Cron job in Spring boot that validates some records in SQL vs Kafka topic. The job needs to run once a day in the morning. I have set the job to run after every 15th minute for my testing and it works as expected. But as soon as I updated the cron to run the job after every 2 hours I am getting records from topic that are already read/process and duplicate by the consumer. I am committing the offset manually with commitAsync. For Example I have sent 3 records to the topic but the consumer is getting more than 5k records mostly duplicates.

Following is the code of the consumer and its properties.

public Map<String, Object> getKafkaConsumerProps() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10000");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "cronKafkaConsumer");
    return props;
}

public List<CostKafkaModel> consumeCosts() {
    KafkaConsumer<String, CostKafkaModel> consumer = new KafkaConsumer<>(
            getKafkaConsumerProps(), new StringDeserializer(),
            new JsonDeserializer<>(CostKafkaModel.class));

    List<CostKafkaModel> kafkaModelList = new ArrayList<>();
    try {
        consumer.subscribe(Arrays.asList("deltaCosts"));
        ConsumerRecords<String, CostKafkaModel> records = consumer
                .poll(1000);
        for (ConsumerRecord<String, CostKafkaModel> record : records) {
            kafkaModelList.add(record.value());
        }
        
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        consumer.commitSync();
        consumer.close();
    }
    return kafkaModelList;
}

Any help would be appriciated.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

2.1m questions

2.1m answers

60 comments

57.0k users

...