Kafka Tutorial: Creating Advanced Kafka Consumers in Java - 2025 Edition

January 9, 2025

                                                                           

🚀 What’s New in This 2025 Update

Major Updates and Changes

  • Broker-Coordinated Rebalancing - No more client-side leader election
  • Incremental Rebalancing - Minimal disruption during scaling
  • End-to-End Exactly-Once - Production-ready transaction support
  • Advanced Threading Models - Async and reactive patterns
  • Smart Offset Management - External storage and recovery options
  • AI Integration Ready - Custom deserializers for ML pipelines

Consumer Evolution Since 2017

  • ✅ Smoother Rebalancing - Incremental protocol reduces downtime
  • ✅ Better Semantics - Mature exactly-once delivery
  • ✅ Enhanced Resilience - Built-in retry and DLQ patterns
  • ✅ Cloud-Native - Auto-scaling and managed services

Welcome to the comprehensive guide on advanced Kafka Consumer patterns for 2025! This tutorial covers everything from delivery semantics to production-ready threading models.

Prerequisites

Before you start:

Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Understanding Delivery Semantics

flowchart TB
  subgraph AtLeastOnce["At-Least-Once"]
    A1[Process Message]
    A2[Commit Offset]
    A1 --> A2
    A2 -->|Failure After Process| A3[Duplicate on Retry]
  end
  
  subgraph AtMostOnce["At-Most-Once"]
    B1[Commit Offset]
    B2[Process Message]
    B1 --> B2
    B2 -->|Failure After Commit| B3[Message Lost]
  end
  
  subgraph ExactlyOnce["Exactly-Once"]
    C1[Begin Transaction]
    C2[Process Message]
    C3[Produce Results]
    C4[Commit Transaction]
    C1 --> C2
    C2 --> C3
    C3 --> C4
    C4 -->|Atomic| C5[No Duplicates/Loss]
  end
  
  style AtLeastOnce fill:#fff3e0,stroke:#ff6f00,stroke-width:2px
  style AtMostOnce fill:#ffebee,stroke:#e53935,stroke-width:2px
  style ExactlyOnce fill:#e8f5e9,stroke:#43a047,stroke-width:2px

At-Least-Once Delivery Consumer

package com.cloudurable.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;

public class AtLeastOnceConsumer {
    private static final Logger logger = LoggerFactory.getLogger(AtLeastOnceConsumer.class);
    
    private final Consumer<String, StockPrice> consumer;
    private final StockPriceProcessor processor;
    
    public AtLeastOnceConsumer() {
        this.consumer = createConsumer();
        this.processor = new StockPriceProcessor();
    }
    
    private Consumer<String, StockPrice> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "at-least-once-consumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
        
        // At-least-once configuration
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        
        return new KafkaConsumer<>(props);
    }
    
    public void consume() {
        consumer.subscribe(Arrays.asList("stock-prices"));
        
        try {
            while (true) {
                ConsumerRecords<String, StockPrice> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, StockPrice> record : records) {
                    try {
                        // Process the record - this might fail
                        processor.process(record.value());
                        
                        // Commit offset only after successful processing
                        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(
                            new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1)
                        );
                        
                        consumer.commitSync(offsets);
                        
                    } catch (Exception e) {
                        logger.error("Error processing record, will retry on next poll", e);
                        // Don't commit offset - message will be redelivered
                        break; // Exit the loop to re-poll
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }
}

At-Most-Once Delivery Consumer

package com.cloudurable.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;

public class AtMostOnceConsumer {
    private static final Logger logger = LoggerFactory.getLogger(AtMostOnceConsumer.class);
    
    private final Consumer<String, StockPrice> consumer;
    private final StockPriceProcessor processor;
    
    public AtMostOnceConsumer() {
        this.consumer = createConsumer();
        this.processor = new StockPriceProcessor();
    }
    
    private Consumer<String, StockPrice> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "at-most-once-consumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
        
        // At-most-once configuration - auto-commit before processing
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        
        return new KafkaConsumer<>(props);
    }
    
    public void consume() {
        consumer.subscribe(Arrays.asList("stock-prices"));
        
        try {
            while (true) {
                ConsumerRecords<String, StockPrice> records = consumer.poll(Duration.ofMillis(1000));
                
                // Offsets are committed automatically before processing
                
                for (ConsumerRecord<String, StockPrice> record : records) {
                    try {
                        // Process record - if this fails, message is lost
                        processor.process(record.value());
                    } catch (Exception e) {
                        logger.error("Failed to process record, message lost", e);
                        // Continue with next record
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }
}

Exactly-Once Delivery Consumer

package com.cloudurable.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;

public class ExactlyOnceConsumer {
    private static final Logger logger = LoggerFactory.getLogger(ExactlyOnceConsumer.class);
    
    private final String consumerGroupId = "exactly-once-consumer";
    private final Consumer<String, StockPrice> consumer;
    private final Producer<String, ProcessedStockData> producer;
    
    public ExactlyOnceConsumer() {
        this.consumer = createConsumer();
        this.producer = createTransactionalProducer();
    }
    
    private Consumer<String, StockPrice> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
        
        // Exactly-once configuration
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        return new KafkaConsumer<>(props);
    }
    
    private Producer<String, ProcessedStockData> createTransactionalProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProcessedDataSerializer.class);
        
        // Transactional configuration
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "stock-processor-tx");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        
        Producer<String, ProcessedStockData> producer = new KafkaProducer<>(props);
        producer.initTransactions();
        
        return producer;
    }
    
    public void consume() {
        consumer.subscribe(Arrays.asList("stock-prices"));
        
        try {
            while (true) {
                ConsumerRecords<String, StockPrice> records = consumer.poll(Duration.ofMillis(1000));
                
                if (!records.isEmpty()) {
                    try {
                        // Begin transaction
                        producer.beginTransaction();
                        
                        for (ConsumerRecord<String, StockPrice> record : records) {
                            // Process and produce in same transaction
                            ProcessedStockData processed = processStockPrice(record.value());
                            
                            producer.send(new ProducerRecord<>(
                                "processed-stocks",
                                record.key(),
                                processed
                            ));
                        }
                        
                        // Send consumer offsets to transaction
                        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
                        for (TopicPartition partition : records.partitions()) {
                            List<ConsumerRecord<String, StockPrice>> partitionRecords = 
                                records.records(partition);
                            long lastOffset = partitionRecords
                                .get(partitionRecords.size() - 1).offset();
                            offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                        }
                        
                        // Commit offsets and transaction atomically
                        producer.sendOffsetsToTransaction(offsets, consumerGroupId);
                        producer.commitTransaction();
                        
                    } catch (KafkaException e) {
                        logger.error("Transaction failed, aborting", e);
                        producer.abortTransaction();
                        // Reset consumer to last committed offset
                        consumer.seekToCommitted(consumer.assignment());
                    }
                }
            }
        } finally {
            consumer.close();
            producer.close();
        }
    }
    
    private ProcessedStockData processStockPrice(StockPrice price) {
        // Business logic here
        return new ProcessedStockData(price);
    }
}

Advanced ConsumerRebalanceListener

package com.cloudurable.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class StatefulRebalanceListener implements ConsumerRebalanceListener {
    private static final Logger logger = LoggerFactory.getLogger(StatefulRebalanceListener.class);
    
    private final Consumer<String, StockPrice> consumer;
    private final Map<TopicPartition, PartitionState> partitionStates = new ConcurrentHashMap<>();
    private final StateStore stateStore;
    
    public StatefulRebalanceListener(Consumer<String, StockPrice> consumer, StateStore stateStore) {
        this.consumer = consumer;
        this.stateStore = stateStore;
    }
    
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        logger.info("Partitions revoked: {}", partitions);
        
        // Save state before losing partitions
        for (TopicPartition partition : partitions) {
            PartitionState state = partitionStates.get(partition);
            if (state != null) {
                try {
                    // Flush any pending work
                    state.flush();
                    
                    // Save state to external store
                    stateStore.savePartitionState(partition, state);
                    
                    // Clean up local state
                    partitionStates.remove(partition);
                    
                    logger.info("Saved state for partition {}", partition);
                } catch (Exception e) {
                    logger.error("Failed to save state for partition {}", partition, e);
                }
            }
        }
        
        // Commit current offsets
        try {
            consumer.commitSync();
        } catch (Exception e) {
            logger.error("Failed to commit offsets during rebalance", e);
        }
    }
    
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        logger.info("Partitions assigned: {}", partitions);
        
        for (TopicPartition partition : partitions) {
            try {
                // Restore state from external store
                PartitionState state = stateStore.loadPartitionState(partition);
                
                if (state != null) {
                    partitionStates.put(partition, state);
                    
                    // Seek to last processed offset
                    consumer.seek(partition, state.getLastProcessedOffset() + 1);
                    
                    logger.info("Restored state for partition {}, seeking to offset {}", 
                               partition, state.getLastProcessedOffset() + 1);
                } else {
                    // No saved state, start from committed offset
                    partitionStates.put(partition, new PartitionState(partition));
                }
                
            } catch (Exception e) {
                logger.error("Failed to restore state for partition {}", partition, e);
            }
        }
    }
    
    @Override
    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        logger.warn("Partitions lost: {}", partitions);
        
        // Clean up without saving (partitions were lost unexpectedly)
        for (TopicPartition partition : partitions) {
            partitionStates.remove(partition);
        }
    }
}

class PartitionState {
    private final TopicPartition partition;
    private long lastProcessedOffset = -1;
    private final Map<String, Double> stockAggregates = new HashMap<>();
    
    public PartitionState(TopicPartition partition) {
        this.partition = partition;
    }
    
    public void updateState(ConsumerRecord<String, StockPrice> record) {
        lastProcessedOffset = record.offset();
        
        // Aggregate logic
        stockAggregates.merge(record.value().getSymbol(), 
                            record.value().getPrice(), 
                            (old, new_) -> (old + new_) / 2);
    }
    
    public void flush() {
        // Flush any pending operations
    }
    
    public long getLastProcessedOffset() {
        return lastProcessedOffset;
    }
}

Manual Partition Assignment

package com.cloudurable.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;

public class PriorityQueueConsumer {
    private static final Logger logger = LoggerFactory.getLogger(PriorityQueueConsumer.class);
    
    private final Consumer<String, StockPrice> consumer;
    private final Set<TopicPartition> highPriorityPartitions;
    private final Set<TopicPartition> lowPriorityPartitions;
    
    public PriorityQueueConsumer() {
        this.consumer = createConsumer();
        
        // Manually assign partitions based on priority
        List<PartitionInfo> partitions = consumer.partitionsFor("stock-prices");
        
        // Assign even partitions as high priority, odd as low
        highPriorityPartitions = partitions.stream()
            .filter(p -> p.partition() % 2 == 0)
            .map(p -> new TopicPartition(p.topic(), p.partition()))
            .collect(Collectors.toSet());
            
        lowPriorityPartitions = partitions.stream()
            .filter(p -> p.partition() % 2 != 0)
            .map(p -> new TopicPartition(p.topic(), p.partition()))
            .collect(Collectors.toSet());
        
        // Manually assign all partitions
        Set<TopicPartition> allPartitions = new HashSet<>();
        allPartitions.addAll(highPriorityPartitions);
        allPartitions.addAll(lowPriorityPartitions);
        consumer.assign(allPartitions);
        
        logger.info("Assigned {} high priority and {} low priority partitions",
                   highPriorityPartitions.size(), lowPriorityPartitions.size());
    }
    
    private Consumer<String, StockPrice> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "priority-consumer-" + UUID.randomUUID());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        return new KafkaConsumer<>(props);
    }
    
    public void consume() {
        try {
            while (true) {
                // Process high priority partitions more frequently
                for (int i = 0; i < 3; i++) {
                    processPartitions(highPriorityPartitions, "HIGH", Duration.ofMillis(100));
                }
                
                // Process low priority partitions less frequently
                processPartitions(lowPriorityPartitions, "LOW", Duration.ofMillis(50));
            }
        } finally {
            consumer.close();
        }
    }
    
    private void processPartitions(Set<TopicPartition> partitions, String priority, Duration timeout) {
        // Pause all partitions
        consumer.pause(consumer.assignment());
        
        // Resume only the partitions we want to process
        consumer.resume(partitions);
        
        ConsumerRecords<String, StockPrice> records = consumer.poll(timeout);
        
        for (ConsumerRecord<String, StockPrice> record : records) {
            logger.info("[{}] Processing: {} - ${}", 
                       priority, 
                       record.value().getSymbol(), 
                       record.value().getPrice());
            
            // Process record based on priority
            if ("HIGH".equals(priority)) {
                processHighPriorityRecord(record);
            } else {
                processLowPriorityRecord(record);
            }
        }
        
        // Commit offsets for processed partitions
        if (!records.isEmpty()) {
            consumer.commitSync();
        }
    }
    
    private void processHighPriorityRecord(ConsumerRecord<String, StockPrice> record) {
        // Fast processing for high priority
    }
    
    private void processLowPriorityRecord(ConsumerRecord<String, StockPrice> record) {
        // Regular processing for low priority
    }
}

Advanced Custom Deserializers

package com.cloudurable.kafka.consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.zip.GZIPInputStream;

public class SecureStockPriceDeserializer implements Deserializer<StockPrice> {
    private static final Logger logger = LoggerFactory.getLogger(SecureStockPriceDeserializer.class);
    
    private ObjectMapper objectMapper;
    private CircuitBreaker circuitBreaker;
    private SecretKeySpec secretKey;
    private boolean isCompressionEnabled;
    private boolean isEncryptionEnabled;
    
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        this.objectMapper = new ObjectMapper();
        
        // Configure resilience
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50)
            .waitDurationInOpenState(Duration.ofSeconds(30))
            .slidingWindowSize(10)
            .build();
        this.circuitBreaker = CircuitBreaker.of("deserializer", config);
        
        // Configure security
        String encryptionKey = (String) configs.get("encryption.key");
        if (encryptionKey != null) {
            this.secretKey = new SecretKeySpec(encryptionKey.getBytes(), "AES");
            this.isEncryptionEnabled = true;
        }
        
        this.isCompressionEnabled = Boolean.parseBoolean(
            (String) configs.getOrDefault("compression.enabled", "false")
        );
    }
    
    @Override
    public StockPrice deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        
        return circuitBreaker.executeSupplier(() -> {
            try {
                byte[] processedData = data;
                
                // Decrypt if enabled
                if (isEncryptionEnabled) {
                    processedData = decrypt(processedData);
                }
                
                // Decompress if enabled
                if (isCompressionEnabled) {
                    processedData = decompress(processedData);
                }
                
                // Check magic bytes for format detection
                if (processedData.length > 4) {
                    ByteBuffer buffer = ByteBuffer.wrap(processedData, 0, 4);
                    int magic = buffer.getInt();
                    
                    if (magic == 0x12345678) {
                        // Custom binary format
                        return deserializeBinary(processedData);
                    }
                }
                
                // Default to JSON
                return objectMapper.readValue(processedData, StockPrice.class);
                
            } catch (Exception e) {
                logger.error("Failed to deserialize from topic {}", topic, e);
                throw new SerializationException("Deserialization failed", e);
            }
        });
    }
    
    private byte[] decrypt(byte[] encrypted) throws Exception {
        Cipher cipher = Cipher.getInstance("AES");
        cipher.init(Cipher.DECRYPT_MODE, secretKey);
        return cipher.doFinal(encrypted);
    }
    
    private byte[] decompress(byte[] compressed) throws Exception {
        try (GZIPInputStream gis = new GZIPInputStream(
                new java.io.ByteArrayInputStream(compressed))) {
            return gis.readAllBytes();
        }
    }
    
    private StockPrice deserializeBinary(byte[] data) {
        ByteBuffer buffer = ByteBuffer.wrap(data);
        buffer.getInt(); // Skip magic bytes
        
        String symbol = readString(buffer);
        double price = buffer.getDouble();
        long volume = buffer.getLong();
        long timestamp = buffer.getLong();
        
        return new StockPrice(symbol, price, volume, timestamp);
    }
    
    private String readString(ByteBuffer buffer) {
        int length = buffer.getInt();
        byte[] bytes = new byte[length];
        buffer.get(bytes);
        return new String(bytes);
    }
    
    @Override
    public void close() {
        // Cleanup resources
    }
}

Consumer Threading Models

Thread-Per-Consumer Model

package com.cloudurable.kafka.consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class ThreadPerConsumerModel {
    private static final Logger logger = LoggerFactory.getLogger(ThreadPerConsumerModel.class);
    
    private final int numConsumers;
    private final ExecutorService executor;
    private final List<ConsumerWorker> workers;
    
    public ThreadPerConsumerModel(int numConsumers) {
        this.numConsumers = numConsumers;
        this.executor = Executors.newFixedThreadPool(numConsumers);
        
        // Create one consumer per thread
        this.workers = IntStream.range(0, numConsumers)
            .mapToObj(i -> new ConsumerWorker(i))
            .toList();
    }
    
    public void start() {
        workers.forEach(executor::submit);
        logger.info("Started {} consumer threads", numConsumers);
    }
    
    public void shutdown() {
        workers.forEach(ConsumerWorker::stop);
        executor.shutdown();
        
        try {
            if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
    
    private static class ConsumerWorker implements Runnable {
        private final int id;
        private final Consumer<String, StockPrice> consumer;
        private volatile boolean running = true;
        
        public ConsumerWorker(int id) {
            this.id = id;
            this.consumer = createConsumer("consumer-thread-" + id);
        }
        
        @Override
        public void run() {
            try {
                consumer.subscribe(Arrays.asList("stock-prices"));
                
                while (running) {
                    ConsumerRecords<String, StockPrice> records = 
                        consumer.poll(Duration.ofMillis(1000));
                    
                    for (var record : records) {
                        logger.info("Thread {} processing: {} - ${}", 
                                   id, record.value().getSymbol(), record.value().getPrice());
                        
                        // Process record
                        processRecord(record);
                    }
                    
                    consumer.commitAsync();
                }
            } catch (Exception e) {
                logger.error("Error in consumer thread {}", id, e);
            } finally {
                consumer.close();
                logger.info("Consumer thread {} stopped", id);
            }
        }
        
        public void stop() {
            running = false;
            consumer.wakeup();
        }
        
        private void processRecord(var record) {
            // Business logic here
        }
        
        private static Consumer<String, StockPrice> createConsumer(String clientId) {
            // Create consumer with unique client ID
            Properties props = new Properties();
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
            // ... other properties
            return new KafkaConsumer<>(props);
        }
    }
}

Multi-Threaded Consumer Model

package com.cloudurable.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class MultiThreadedConsumerModel {
    private static final Logger logger = LoggerFactory.getLogger(MultiThreadedConsumerModel.class);
    
    private final Consumer<String, StockPrice> consumer;
    private final ExecutorService processingPool;
    private final BlockingQueue<ConsumerRecord<String, StockPrice>> recordQueue;
    private final Map<TopicPartition, Long> pendingOffsets;
    private final AtomicBoolean running = new AtomicBoolean(true);
    
    public MultiThreadedConsumerModel(int processingThreads) {
        this.consumer = createConsumer();
        this.processingPool = Executors.newFixedThreadPool(processingThreads);
        this.recordQueue = new LinkedBlockingQueue<>(10000);
        this.pendingOffsets = new ConcurrentHashMap<>();
        
        // Start processing threads
        for (int i = 0; i < processingThreads; i++) {
            processingPool.submit(new RecordProcessor(i));
        }
    }
    
    private Consumer<String, StockPrice> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "multi-threaded-consumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        
        return new KafkaConsumer<>(props);
    }
    
    public void start() {
        consumer.subscribe(Arrays.asList("stock-prices"));
        
        // Consumer thread - only polls and queues records
        Thread consumerThread = new Thread(() -> {
            try {
                while (running.get()) {
                    ConsumerRecords<String, StockPrice> records = 
                        consumer.poll(Duration.ofMillis(1000));
                    
                    for (ConsumerRecord<String, StockPrice> record : records) {
                        // Queue record for processing
                        while (!recordQueue.offer(record, 100, TimeUnit.MILLISECONDS)) {
                            if (!running.get()) break;
                        }
                        
                        // Track pending offset
                        pendingOffsets.merge(
                            new TopicPartition(record.topic(), record.partition()),
                            record.offset(),
                            Math::max
                        );
                    }
                    
                    // Commit completed offsets
                    commitCompletedOffsets();
                }
            } catch (Exception e) {
                logger.error("Error in consumer thread", e);
            } finally {
                consumer.close();
            }
        });
        
        consumerThread.start();
    }
    
    private void commitCompletedOffsets() {
        Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
        
        // This is simplified - real implementation needs to track completion
        pendingOffsets.forEach((partition, offset) -> {
            toCommit.put(partition, new OffsetAndMetadata(offset + 1));
        });
        
        if (!toCommit.isEmpty()) {
            consumer.commitAsync(toCommit, (offsets, exception) -> {
                if (exception != null) {
                    logger.error("Failed to commit offsets", exception);
                }
            });
        }
    }
    
    private class RecordProcessor implements Runnable {
        private final int id;
        
        public RecordProcessor(int id) {
            this.id = id;
        }
        
        @Override
        public void run() {
            while (running.get()) {
                try {
                    ConsumerRecord<String, StockPrice> record = 
                        recordQueue.poll(100, TimeUnit.MILLISECONDS);
                    
                    if (record != null) {
                        logger.info("Processor {} handling: {} - ${}", 
                                   id, record.value().getSymbol(), record.value().getPrice());
                        
                        // Process record
                        processRecord(record);
                        
                        // Mark as completed (simplified)
                        // Real implementation needs proper offset tracking
                    }
                } catch (Exception e) {
                    logger.error("Error in processor {}", id, e);
                }
            }
        }
        
        private void processRecord(ConsumerRecord<String, StockPrice> record) {
            // Business logic here
        }
    }
    
    public void shutdown() {
        running.set(false);
        consumer.wakeup();
        processingPool.shutdown();
        
        try {
            if (!processingPool.awaitTermination(30, TimeUnit.SECONDS)) {
                processingPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            processingPool.shutdownNow();
        }
    }
}

Reactive Consumer Model

package com.cloudurable.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class ReactiveConsumerModel {
    private static final Logger logger = LoggerFactory.getLogger(ReactiveConsumerModel.class);
    
    private final KafkaReceiver<String, StockPrice> receiver;
    
    public ReactiveConsumerModel() {
        this.receiver = createReceiver();
    }
    
    private KafkaReceiver<String, StockPrice> createReceiver() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        ReceiverOptions<String, StockPrice> receiverOptions = 
            ReceiverOptions.<String, StockPrice>create(props)
                .subscription(Collections.singleton("stock-prices"))
                .commitBatchSize(10)
                .commitInterval(Duration.ofSeconds(1));
        
        return KafkaReceiver.create(receiverOptions);
    }
    
    public void consume() {
        receiver.receive()
            .groupBy(record -> record.partition())
            .flatMap(partitionFlux -> partitionFlux
                .publishOn(Schedulers.parallel())
                .flatMap(this::processRecord)
                .sample(Duration.ofSeconds(5))
                .doOnNext(record -> record.receiverOffset().commit())
            )
            .subscribe();
    }
    
    private Mono<ReceiverRecord<String, StockPrice>> processRecord(
            ReceiverRecord<String, StockPrice> record) {
        
        return Mono.fromCallable(() -> {
            StockPrice stockPrice = record.value();
            logger.info("Processing: {} - ${}", stockPrice.getSymbol(), stockPrice.getPrice());
            
            // Simulate async processing
            if (stockPrice.getPrice() > 1000) {
                return processHighValueStock(record);
            } else {
                return processNormalStock(record);
            }
        })
        .subscribeOn(Schedulers.boundedElastic())
        .doOnError(error -> logger.error("Error processing record", error))
        .retry(3)
        .onErrorReturn(record);
    }
    
    private ReceiverRecord<String, StockPrice> processHighValueStock(
            ReceiverRecord<String, StockPrice> record) {
        // Special processing for high-value stocks
        return record;
    }
    
    private ReceiverRecord<String, StockPrice> processNormalStock(
            ReceiverRecord<String, StockPrice> record) {
        // Normal processing
        return record;
    }
}

Advanced Offset Management

package com.cloudurable.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class ExternalOffsetManager {
    private static final Logger logger = LoggerFactory.getLogger(ExternalOffsetManager.class);
    
    private final Consumer<String, StockPrice> consumer;
    private final OffsetStore offsetStore;
    private final Map<TopicPartition, Long> processingOffsets;
    
    public ExternalOffsetManager(OffsetStore offsetStore) {
        this.consumer = createConsumer();
        this.offsetStore = offsetStore;
        this.processingOffsets = new ConcurrentHashMap<>();
    }
    
    private Consumer<String, StockPrice> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "external-offset-consumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
        
        return new KafkaConsumer<>(props);
    }
    
    public void consume() {
        // Subscribe with custom rebalance listener
        consumer.subscribe(Arrays.asList("stock-prices"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                // Save current processing state
                saveOffsetsToExternalStore();
            }
            
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                // Restore from external store
                for (TopicPartition partition : partitions) {
                    Long offset = offsetStore.getOffset(partition);
                    if (offset != null) {
                        consumer.seek(partition, offset);
                        logger.info("Restored offset {} for partition {}", offset, partition);
                    }
                }
            }
        });
        
        try {
            while (true) {
                ConsumerRecords<String, StockPrice> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, StockPrice> record : records) {
                    try {
                        // Process record
                        processRecord(record);
                        
                        // Track successful processing
                        processingOffsets.put(
                            new TopicPartition(record.topic(), record.partition()),
                            record.offset() + 1
                        );
                        
                    } catch (Exception e) {
                        logger.error("Failed to process record", e);
                        // Implement retry logic or DLQ here
                    }
                }
                
                // Periodically save to external store
                if (!records.isEmpty() && System.currentTimeMillis() % 5000 < 1000) {
                    saveOffsetsToExternalStore();
                }
            }
        } finally {
            saveOffsetsToExternalStore();
            consumer.close();
        }
    }
    
    private void saveOffsetsToExternalStore() {
        processingOffsets.forEach((partition, offset) -> {
            try {
                offsetStore.saveOffset(partition, offset);
                logger.debug("Saved offset {} for partition {}", offset, partition);
            } catch (Exception e) {
                logger.error("Failed to save offset for partition {}", partition, e);
            }
        });
    }
    
    private void processRecord(ConsumerRecord<String, StockPrice> record) {
        // Business logic
    }
}

interface OffsetStore {
    void saveOffset(TopicPartition partition, long offset);
    Long getOffset(TopicPartition partition);
}

// Example implementation using Redis
class RedisOffsetStore implements OffsetStore {
    private final RedisClient redisClient;
    
    public RedisOffsetStore(RedisClient redisClient) {
        this.redisClient = redisClient;
    }
    
    @Override
    public void saveOffset(TopicPartition partition, long offset) {
        String key = String.format("kafka:offset:%s:%d", partition.topic(), partition.partition());
        redisClient.set(key, String.valueOf(offset));
    }
    
    @Override
    public Long getOffset(TopicPartition partition) {
        String key = String.format("kafka:offset:%s:%d", partition.topic(), partition.partition());
        String value = redisClient.get(key);
        return value != null ? Long.parseLong(value) : null;
    }
}

Best Practices Summary

flowchart TB
  subgraph BestPractices["Consumer Best Practices"]
    subgraph Reliability["Reliability"]
      R1[Use Manual Commits]
      R2[Implement Idempotence]
      R3[Handle Rebalances]
      R4[Monitor Consumer Lag]
    end
    
    subgraph Performance["Performance"]
      P1[Tune Poll Settings]
      P2[Use Appropriate Threading]
      P3[Batch Processing]
      P4[Optimize Deserialization]
    end
    
    subgraph Resilience["Resilience"]
      RS1[Implement DLQ]
      RS2[Circuit Breakers]
      RS3[Retry Logic]
      RS4[Health Checks]
    end
  end
  
  style Reliability fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
  style Performance fill:#e8f5e9,stroke:#43a047,stroke-width:2px
  style Resilience fill:#fff3e0,stroke:#ff6f00,stroke-width:2px

Review Questions

When should you use exactly-once semantics?

Use exactly-once when data accuracy is critical and duplicates would cause issues, such as financial transactions or inventory management.

What’s the benefit of ConsumerRebalanceListener?

It allows you to save state before losing partitions and restore state when gaining partitions, ensuring smooth rebalancing.

When to use manual partition assignment?

Use manual assignment for specialized use cases like priority queues, deterministic ordering, or workload isolation.

Which threading model is best?

Thread-per-consumer is simplest, multi-threaded offers better resource utilization, and reactive provides best scalability for I/O-bound operations.

How to handle poison messages?

Implement try-catch blocks, use circuit breakers, send failures to DLQ, and monitor/alert on repeated failures.

Summary

Advanced Kafka consumers in 2025 offer:

  • Flexible Delivery Semantics for different consistency requirements
  • Smooth Rebalancing with minimal disruption
  • Multiple Threading Models for various use cases
  • Robust Error Handling with DLQ and circuit breakers
  • Production-Ready Patterns for reliability at scale

Master these patterns to build resilient, high-performance streaming applications!

About Cloudurable

We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.

                                                                           
comments powered by Disqus

Apache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting