Kafka Tutorial: Creating a Kafka Consumer in Java - 2025 Edition

January 9, 2025

                                                                           

🚀 What’s New in This 2025 Update

Major Updates and Changes

  • KIP-848 Protocol - Zero-downtime rebalancing (no more stop-the-world!)
  • KRaft Mode - No ZooKeeper needed
  • Exactly-Once Consumers - Transactional message processing
  • Share Groups - New queue-like consumption pattern
  • Enhanced Monitoring - Built-in lag tracking
  • Cloud-Native Patterns - Container-ready consumers

Consumer Evolution Since 2017

  • ✅ Smoother Rebalancing - Incremental cooperative protocol
  • ✅ Better Performance - Optimized fetch and poll
  • ✅ Improved Reliability - Automatic offset management
  • ✅ Enhanced Observability - Detailed metrics and tracing

Ready to build modern Kafka consumers? Let’s create production-ready consumers that can handle millions of messages reliably!

Kafka Tutorial: Writing a Kafka Consumer in Java

flowchart TB
  subgraph ConsumerGroup["Consumer Group: analytics-service"]
    C1[Consumer 1<br>Partitions: 0,1,2,3]
    C2[Consumer 2<br>Partitions: 4,5,6,7]
    C3[Consumer 3<br>Partitions: 8,9,10,11,12]
  end
  
  subgraph Topic["Topic: events (13 partitions)"]
    P0[P0] --> C1
    P1[P1] --> C1
    P2[P2] --> C1
    P3[P3] --> C1
    P4[P4] --> C2
    P5[P5] --> C2
    P6[P6] --> C2
    P7[P7] --> C2
    P8[P8] --> C3
    P9[P9] --> C3
    P10[P10] --> C3
    P11[P11] --> C3
    P12[P12] --> C3
  end
  
  classDef consumer fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#333333
  classDef partition fill:#e8f5e9,stroke:#43a047,stroke-width:1px,color:#333333
  
  class C1,C2,C3 consumer
  class P0,P1,P2,P3,P4,P5,P6,P7,P8,P9,P10,P11,P12 partition

In this tutorial, we’ll create modern Kafka consumers that:

  • Use the new KIP-848 rebalancing protocol for zero-downtime operations
  • Implement exactly-once semantics with transactions
  • Handle errors gracefully with dead letter queues
  • Monitor consumer lag for operational excellence
  • Follow cloud-native best practices

Prerequisites

Before you start:

Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Modern Kafka Consumer Implementation

Event Model (Reusing from Producer)

package com.cloudurable.kafka.model;

import java.time.Instant;
import java.util.UUID;

public record Event(
    String id,
    String userId,
    String type,
    String data,
    Instant timestamp
) {
    public static Event create(String userId, String type, String data) {
        return new Event(
            UUID.randomUUID().toString(),
            userId,
            type,
            data,
            Instant.now()
        );
    }
}

JSON Deserializer

package com.cloudurable.kafka.serialization;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.kafka.common.serialization.Deserializer;

public class JsonDeserializer<T> implements Deserializer<T> {
    private final ObjectMapper objectMapper;
    private final Class<T> clazz;
    
    public JsonDeserializer(Class<T> clazz) {
        this.objectMapper = new ObjectMapper();
        this.objectMapper.registerModule(new JavaTimeModule());
        this.clazz = clazz;
    }
    
    @Override
    public T deserialize(String topic, byte[] data) {
        if (data == null) return null;
        
        try {
            return objectMapper.readValue(data, clazz);
        } catch (Exception e) {
            throw new RuntimeException("Error deserializing JSON", e);
        }
    }
}

Basic Consumer with KIP-848 Protocol

package com.cloudurable.kafka;

import com.cloudurable.kafka.model.Event;
import com.cloudurable.kafka.serialization.JsonDeserializer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ModernKafkaConsumer {
    private static final Logger logger = LoggerFactory.getLogger(ModernKafkaConsumer.class);
    
    private static final String TOPIC = "events";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    
    public static Consumer<String, Event> createConsumer(String groupId) {
        Properties props = new Properties();
        
        // Connection settings
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "event-consumer-" + System.currentTimeMillis());
        
        // Use new KIP-848 consumer protocol for better rebalancing
        props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
        
        // Deserialization
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
        props.put("value.deserializer.class", Event.class.getName());
        
        // Performance optimizations
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 1KB
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100); // 100ms
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        
        // Offset management
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Manual commit for control
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // Session timeout for faster failure detection
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); // 10 seconds
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 3 seconds
        
        return new KafkaConsumer<>(props);
    }
    
    public static void processMessages(Consumer<String, Event> consumer) {
        consumer.subscribe(Collections.singletonList(TOPIC));
        
        try {
            while (true) {
                ConsumerRecords<String, Event> records = consumer.poll(Duration.ofMillis(100));
                
                if (!records.isEmpty()) {
                    logger.info("Fetched {} records", records.count());
                    
                    for (ConsumerRecord<String, Event> record : records) {
                        processRecord(record);
                    }
                    
                    // Commit after successful processing
                    consumer.commitSync();
                }
            }
        } catch (Exception e) {
            logger.error("Error in consumer loop", e);
        } finally {
            consumer.close();
        }
    }
    
    private static void processRecord(ConsumerRecord<String, Event> record) {
        logger.info("Processing: partition={}, offset={}, key={}, event={}",
            record.partition(), record.offset(), record.key(), record.value());
        
        // Simulate business logic
        Event event = record.value();
        switch (event.type()) {
            case "page-view" -> processPageView(event);
            case "purchase" -> processPurchase(event);
            case "login" -> processLogin(event);
            default -> logger.warn("Unknown event type: {}", event.type());
        }
    }
    
    private static void processPageView(Event event) {
        // Business logic for page views
        logger.debug("Processing page view: {}", event);
    }
    
    private static void processPurchase(Event event) {
        // Business logic for purchases
        logger.debug("Processing purchase: {}", event);
    }
    
    private static void processLogin(Event event) {
        // Business logic for logins
        logger.debug("Processing login: {}", event);
    }
}

Consumer with Error Handling and Dead Letter Queue

package com.cloudurable.kafka;

import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;

public class ResilientConsumer {
    private static final Logger logger = LoggerFactory.getLogger(ResilientConsumer.class);
    
    private static final String TOPIC = "events";
    private static final String DLQ_TOPIC = "events-dlq";
    private static final int MAX_RETRIES = 3;
    
    private final Consumer<String, Event> consumer;
    private final Producer<String, String> dlqProducer;
    private final Map<TopicPartition, AtomicLong> partitionRetryCount = new HashMap<>();
    
    public ResilientConsumer(String groupId) {
        this.consumer = ModernKafkaConsumer.createConsumer(groupId);
        this.dlqProducer = createDLQProducer();
    }
    
    private Producer<String, String> createDLQProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        return new KafkaProducer<>(props);
    }
    
    public void processWithRetries() {
        consumer.subscribe(Collections.singletonList(TOPIC));
        
        try {
            while (true) {
                ConsumerRecords<String, Event> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, Event> record : records) {
                    TopicPartition tp = new TopicPartition(record.topic(), record.partition());
                    
                    try {
                        processRecord(record);
                        
                        // Reset retry count on success
                        partitionRetryCount.remove(tp);
                        
                    } catch (Exception e) {
                        handleProcessingError(record, e, tp);
                    }
                }
                
                if (!records.isEmpty()) {
                    consumer.commitSync();
                }
            }
        } finally {
            consumer.close();
            dlqProducer.close();
        }
    }
    
    private void handleProcessingError(ConsumerRecord<String, Event> record, 
                                     Exception e, 
                                     TopicPartition tp) {
        AtomicLong retryCount = partitionRetryCount.computeIfAbsent(tp, k -> new AtomicLong(0));
        long currentRetry = retryCount.incrementAndGet();
        
        logger.error("Error processing record at partition {} offset {}, retry {}/{}", 
            record.partition(), record.offset(), currentRetry, MAX_RETRIES, e);
        
        if (currentRetry >= MAX_RETRIES) {
            // Send to DLQ
            sendToDLQ(record, e);
            
            // Seek past the problematic record
            consumer.seek(tp, record.offset() + 1);
            
            // Reset retry count
            partitionRetryCount.remove(tp);
        } else {
            // Pause partition and retry later
            consumer.pause(Collections.singleton(tp));
            
            // Schedule unpause after backoff
            scheduleUnpause(tp, currentRetry);
        }
    }
    
    private void sendToDLQ(ConsumerRecord<String, Event> record, Exception error) {
        try {
            ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
                DLQ_TOPIC,
                record.key(),
                record.value().toString()
            );
            
            // Add error metadata as headers
            dlqRecord.headers()
                .add("original-topic", record.topic().getBytes())
                .add("original-partition", String.valueOf(record.partition()).getBytes())
                .add("original-offset", String.valueOf(record.offset()).getBytes())
                .add("error-message", error.getMessage().getBytes())
                .add("error-class", error.getClass().getName().getBytes());
            
            dlqProducer.send(dlqRecord).get();
            logger.info("Sent failed record to DLQ: {}", record.key());
            
        } catch (Exception dlqError) {
            logger.error("Failed to send to DLQ", dlqError);
            // Could implement additional fallback here
        }
    }
    
    private void scheduleUnpause(TopicPartition tp, long retryCount) {
        // Exponential backoff
        long backoffMs = Math.min(1000 * (long) Math.pow(2, retryCount), 60000);
        
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                consumer.resume(Collections.singleton(tp));
                logger.info("Resumed partition {} after {} ms backoff", tp, backoffMs);
            }
        }, backoffMs);
    }
    
    private void processRecord(ConsumerRecord<String, Event> record) throws Exception {
        // Simulate processing that might fail
        Event event = record.value();
        
        if (event.data().contains("error")) {
            throw new RuntimeException("Simulated processing error");
        }
        
        // Normal processing
        logger.info("Successfully processed: {}", event);
    }
}

Exactly-Once Consumer with Transactions

package com.cloudurable.kafka;

import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;

public class ExactlyOnceConsumer {
    private static final Logger logger = LoggerFactory.getLogger(ExactlyOnceConsumer.class);
    
    private static final String INPUT_TOPIC = "events";
    private static final String OUTPUT_TOPIC = "processed-events";
    
    public static void processExactlyOnce(String groupId) {
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        consumerProps.put("value.deserializer.class", Event.class.getName());
        
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "exactly-once-consumer-" + groupId);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        try (Consumer<String, Event> consumer = new KafkaConsumer<>(consumerProps);
             Producer<String, Event> producer = new KafkaProducer<>(producerProps)) {
            
            // Initialize transactions
            producer.initTransactions();
            consumer.subscribe(Collections.singletonList(INPUT_TOPIC));
            
            while (true) {
                ConsumerRecords<String, Event> records = consumer.poll(Duration.ofMillis(100));
                
                if (!records.isEmpty()) {
                    // Begin transaction
                    producer.beginTransaction();
                    
                    try {
                        // Process records
                        for (ConsumerRecord<String, Event> record : records) {
                            Event processedEvent = processEvent(record.value());
                            
                            // Send to output topic
                            producer.send(new ProducerRecord<>(
                                OUTPUT_TOPIC,
                                record.key(),
                                processedEvent
                            ));
                        }
                        
                        // Send consumer offsets in same transaction
                        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
                        for (TopicPartition partition : records.partitions()) {
                            List<ConsumerRecord<String, Event>> partitionRecords = 
                                records.records(partition);
                            long lastOffset = partitionRecords
                                .get(partitionRecords.size() - 1).offset();
                            offsetsToCommit.put(partition, 
                                new OffsetAndMetadata(lastOffset + 1));
                        }
                        
                        // Commit transaction with offsets
                        producer.sendOffsetsToTransaction(offsetsToCommit, 
                            new ConsumerGroupMetadata(groupId));
                        producer.commitTransaction();
                        
                        logger.info("Successfully processed {} records in transaction", 
                            records.count());
                        
                    } catch (Exception e) {
                        logger.error("Error in transaction, aborting", e);
                        producer.abortTransaction();
                        throw e;
                    }
                }
            }
        }
    }
    
    private static Event processEvent(Event event) {
        // Transform the event
        return Event.create(
            event.userId(),
            "processed-" + event.type(),
            event.data() + " [processed]"
        );
    }
}

Consumer Group Coordination Demo

package com.cloudurable.kafka;

import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class ConsumerGroupDemo {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerGroupDemo.class);
    
    public static class PartitionAwareConsumer implements ConsumerRebalanceListener {
        private final String consumerId;
        private final Consumer<String, Event> consumer;
        
        public PartitionAwareConsumer(String consumerId, String groupId) {
            this.consumerId = consumerId;
            this.consumer = createConsumer(consumerId, groupId);
        }
        
        private Consumer<String, Event> createConsumer(String consumerId, String groupId) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId);
            props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"); // KIP-848
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
            props.put("value.deserializer.class", Event.class.getName());
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            
            return new KafkaConsumer<>(props);
        }
        
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            logger.info("Consumer {} - Partitions revoked: {}", consumerId, partitions);
        }
        
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            logger.info("Consumer {} - Partitions assigned: {}", consumerId, partitions);
        }
        
        public void consume(CountDownLatch latch) {
            consumer.subscribe(Collections.singletonList("events"), this);
            
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    ConsumerRecords<String, Event> records = 
                        consumer.poll(Duration.ofMillis(100));
                    
                    for (ConsumerRecord<String, Event> record : records) {
                        logger.info("Consumer {} - Partition: {}, Offset: {}, Event: {}",
                            consumerId, record.partition(), record.offset(), 
                            record.value().type());
                    }
                    
                    if (!records.isEmpty()) {
                        consumer.commitSync();
                    }
                }
            } finally {
                consumer.close();
                latch.countDown();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        String groupId = "demo-consumer-group";
        int numConsumers = 3;
        CountDownLatch latch = new CountDownLatch(numConsumers);
        
        // Start multiple consumers in the same group
        for (int i = 0; i < numConsumers; i++) {
            String consumerId = "consumer-" + i;
            
            new Thread(() -> {
                PartitionAwareConsumer consumer = 
                    new PartitionAwareConsumer(consumerId, groupId);
                consumer.consume(latch);
            }).start();
        }
        
        // Wait for all consumers to finish
        latch.await();
    }
}

Consumer Lag Monitoring

package com.cloudurable.kafka.monitoring;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class ConsumerLagMonitor {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerLagMonitor.class);
    
    private final AdminClient adminClient;
    
    public ConsumerLagMonitor() {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        this.adminClient = AdminClient.create(props);
    }
    
    public Map<TopicPartition, Long> calculateLag(String groupId) 
            throws ExecutionException, InterruptedException {
        
        // Get consumer group offsets
        ListConsumerGroupOffsetsResult offsetsResult = 
            adminClient.listConsumerGroupOffsets(groupId);
        Map<TopicPartition, OffsetAndMetadata> groupOffsets = 
            offsetsResult.partitionsToOffsetAndMetadata().get();
        
        // Get topic end offsets
        Set<TopicPartition> partitions = groupOffsets.keySet();
        Map<TopicPartition, OffsetSpec> offsetSpecs = partitions.stream()
            .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
        
        ListOffsetsResult listOffsetsResult = 
            adminClient.listOffsets(offsetSpecs);
        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = 
            listOffsetsResult.all().get();
        
        // Calculate lag
        Map<TopicPartition, Long> lagMap = new HashMap<>();
        for (TopicPartition partition : partitions) {
            long consumerOffset = groupOffsets.get(partition).offset();
            long endOffset = endOffsets.get(partition).offset();
            long lag = endOffset - consumerOffset;
            
            lagMap.put(partition, lag);
            
            if (lag > 1000) {
                logger.warn("High lag detected for {} in group {}: {} messages behind",
                    partition, groupId, lag);
            }
        }
        
        return lagMap;
    }
    
    public void monitorLag(String groupId, long intervalMs) {
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    Map<TopicPartition, Long> lag = calculateLag(groupId);
                    long totalLag = lag.values().stream()
                        .mapToLong(Long::longValue)
                        .sum();
                    
                    logger.info("Consumer group {} - Total lag: {} messages", 
                        groupId, totalLag);
                    
                    // Export metrics to monitoring system
                    exportMetrics(groupId, lag);
                    
                } catch (Exception e) {
                    logger.error("Error monitoring lag", e);
                }
            }
        }, 0, intervalMs);
    }
    
    private void exportMetrics(String groupId, Map<TopicPartition, Long> lag) {
        // Export to Prometheus, CloudWatch, etc.
        lag.forEach((partition, lagValue) -> {
            // Example: Push to metrics system
            logger.debug("metric.consumer.lag{{group={},topic={},partition={}}} {}", 
                groupId, partition.topic(), partition.partition(), lagValue);
        });
    }
    
    public void close() {
        adminClient.close();
    }
}

Cloud-Native Consumer Pattern

package com.cloudurable.kafka.cloud;

import com.cloudurable.kafka.model.Event;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.apache.kafka.clients.consumer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class CloudNativeConsumer {
    private static final Logger logger = LoggerFactory.getLogger(CloudNativeConsumer.class);
    
    private final Consumer<String, Event> consumer;
    private final MeterRegistry meterRegistry;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final String consumerId;
    
    public CloudNativeConsumer(String groupId, MeterRegistry meterRegistry) {
        this.consumerId = System.getenv("HOSTNAME"); // Pod name in K8s
        this.consumer = createConsumer(groupId);
        this.meterRegistry = meterRegistry;
        
        // Register shutdown hook for graceful shutdown
        Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
    }
    
    private Consumer<String, Event> createConsumer(String groupId) {
        Properties props = new Properties();
        
        // Configuration from environment variables (12-factor app)
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            System.getenv().getOrDefault("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"));
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId);
        
        // Cloud-optimized settings
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); // 20s for cloud
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 6000); // 6s
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 min
        
        // Security from environment
        String securityProtocol = System.getenv("KAFKA_SECURITY_PROTOCOL");
        if (securityProtocol != null) {
            props.put("security.protocol", securityProtocol);
            props.put("sasl.mechanism", System.getenv("KAFKA_SASL_MECHANISM"));
            props.put("sasl.jaas.config", System.getenv("KAFKA_SASL_JAAS_CONFIG"));
        }
        
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put("value.deserializer.class", Event.class.getName());
        
        return new KafkaConsumer<>(props);
    }
    
    public void consume() {
        consumer.subscribe(Collections.singletonList("events"));
        
        while (running.get()) {
            try {
                ConsumerRecords<String, Event> records = consumer.poll(Duration.ofMillis(100));
                
                if (!records.isEmpty()) {
                    Timer.Sample sample = Timer.start(meterRegistry);
                    
                    processRecords(records);
                    
                    consumer.commitSync();
                    
                    sample.stop(meterRegistry.timer("consumer.batch.processing.time",
                        "consumer.id", consumerId,
                        "group.id", consumer.groupMetadata().groupId()));
                    
                    meterRegistry.counter("consumer.records.processed",
                        "consumer.id", consumerId)
                        .increment(records.count());
                }
                
                // Health check
                updateHealthStatus();
                
            } catch (Exception e) {
                logger.error("Error in consumer loop", e);
                meterRegistry.counter("consumer.errors",
                    "consumer.id", consumerId,
                    "error.type", e.getClass().getSimpleName())
                    .increment();
                
                // Implement circuit breaker pattern
                handleError(e);
            }
        }
    }
    
    private void processRecords(ConsumerRecords<String, Event> records) {
        records.forEach(record -> {
            try {
                processEvent(record.value());
            } catch (Exception e) {
                logger.error("Error processing record", e);
                // Send to DLQ or retry logic
            }
        });
    }
    
    private void processEvent(Event event) {
        // Business logic
        logger.info("Processing event: {}", event);
    }
    
    private void updateHealthStatus() {
        // Update health endpoint for K8s liveness/readiness probes
        // This could write to a file or update a health endpoint
    }
    
    private void handleError(Exception e) {
        // Implement exponential backoff or circuit breaker
        if (e instanceof org.apache.kafka.common.errors.TimeoutException) {
            logger.warn("Timeout detected, backing off");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public void shutdown() {
        logger.info("Shutting down consumer {}", consumerId);
        running.set(false);
        consumer.wakeup(); // Interrupt poll()
        consumer.close();
    }
}

Testing Your Consumer

package com.cloudurable.kafka;

import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.*;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

class ConsumerTest {
    
    @Test
    void testConsumerProcessing() {
        // Create mock consumer
        Consumer<String, Event> mockConsumer = mock(Consumer.class);
        
        // Create test records
        List<ConsumerRecord<String, Event>> recordsList = Arrays.asList(
            new ConsumerRecord<>("events", 0, 100L, "user1", 
                Event.create("user1", "login", "{}")),
            new ConsumerRecord<>("events", 1, 200L, "user2", 
                Event.create("user2", "purchase", "{}"))
        );
        
        ConsumerRecords<String, Event> records = new ConsumerRecords<>(
            Map.of(
                new TopicPartition("events", 0), 
                Collections.singletonList(recordsList.get(0)),
                new TopicPartition("events", 1), 
                Collections.singletonList(recordsList.get(1))
            )
        );
        
        // Mock behavior
        when(mockConsumer.poll(any(Duration.class))).thenReturn(records);
        
        // Test processing
        ConsumerRecords<String, Event> result = mockConsumer.poll(Duration.ofMillis(100));
        assertEquals(2, result.count());
        
        // Verify first record
        ConsumerRecord<String, Event> first = result.iterator().next();
        assertEquals("user1", first.key());
        assertEquals("login", first.value().type());
    }
}

Performance Tuning

flowchart TB
  subgraph Tuning["Consumer Performance Tuning"]
    FETCH[Increase Fetch Size<br>fetch.min.bytes=10KB]
    POLL[Optimize Poll<br>max.poll.records=1000]
    PARALLEL[Parallel Processing<br>Multiple Consumers]
    COMMIT[Batch Commits<br>Manual Offset Management]
    PREFETCH[Prefetch Tuning<br>fetch.max.wait.ms=500]
  end
  
  subgraph Metrics["Monitor These"]
    M1[Records/Second]
    M2[Lag per Partition]
    M3[Commit Latency]
    M4[Processing Time]
    M5[Rebalance Frequency]
  end
  
  Tuning --> Metrics
  
  style Tuning fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
  style Metrics fill:#e8f5e9,stroke:#43a047,stroke-width:1px

Common Patterns and Solutions

Pattern: Parallel Processing within Consumer

public class ParallelProcessingConsumer {
    private final ExecutorService executorService = 
        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
    public void consumeWithParallelProcessing(Consumer<String, Event> consumer) {
        while (true) {
            ConsumerRecords<String, Event> records = consumer.poll(Duration.ofMillis(100));
            
            if (!records.isEmpty()) {
                List<CompletableFuture<Void>> futures = new ArrayList<>();
                
                for (ConsumerRecord<String, Event> record : records) {
                    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                        processRecord(record);
                    }, executorService);
                    
                    futures.add(future);
                }
                
                // Wait for all processing to complete
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
                
                // Commit after all records processed
                consumer.commitSync();
            }
        }
    }
}

Review Questions and Answers

How does KIP-848 improve consumer rebalancing?

KIP-848 eliminates “stop-the-world” rebalances by using incremental cooperative rebalancing, allowing consumers to continue processing during rebalance operations.

When should you use manual vs automatic offset commits?

Use manual commits when you need precise control over when offsets are committed, especially for exactly-once processing or when implementing custom error handling.

How do you handle poison messages?

Implement a retry mechanism with exponential backoff, and after max retries, send the message to a dead letter queue for manual inspection.

What’s the optimal number of consumers in a group?

Ideally, have as many consumers as partitions for maximum parallelism. Having more consumers than partitions results in idle consumers.

How do you monitor consumer health?

Track consumer lag, processing time, error rates, and rebalance frequency. Set alerts for high lag or frequent rebalances.

Summary

Modern Kafka consumers in 2025 emphasize:

  • Smooth operations with KIP-848 rebalancing protocol
  • Reliability through error handling and dead letter queues
  • Observability with comprehensive monitoring
  • Cloud-native patterns for container deployments
  • Exactly-once processing for critical applications

The combination of these patterns ensures your consumers can handle millions of messages reliably!

About Cloudurable

We hope you enjoyed this tutorial. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.

                                                                           
comments powered by Disqus

Apache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting