Kafka Tutorial: Advanced Kafka Consumers in Java - Part 1 - 2025 Edition

January 9, 2025

                                                                           

🚀 What’s New in This 2025 Update

Major Updates and Changes

  • KIP-848 Protocol - Broker-coordinated partition assignment
  • Cooperative Rebalancing - Zero-downtime partition reassignment
  • Dead Letter Queues - Standard pattern for error handling
  • Consumer Interceptors - Cross-cutting concerns made easy
  • Automated Scaling - Cloud-native auto-scaling patterns
  • Enhanced Monitoring - Built-in lag tracking and alerting

Consumer Evolution Since 2018

  • ✅ Smoother Operations - Incremental rebalancing minimizes disruption
  • ✅ Better Error Handling - DLQ and retry patterns standardized
  • ✅ Improved Observability - Metrics and interceptors built-in
  • ✅ Cloud-Native Ready - Auto-scaling and managed services

Welcome to Part 1 of Advanced Kafka Consumers in 2025! This tutorial covers modern consumer patterns that have become essential for production Kafka deployments.

Prerequisites

Before you start:

Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Advanced Consumer Architecture

flowchart TB
  subgraph ConsumerGroup["Consumer Group with KIP-848"]
    C1[Consumer 1<br>Partitions: 0,1]
    C2[Consumer 2<br>Partitions: 2,3]
    C3[Consumer 3<br>Partitions: 4,5]
    
    subgraph Coordinator["Broker Coordinator"]
      BA[Broker Assignment<br>Logic]
      OM[Offset Manager]
      HB[Heartbeat Monitor]
    end
  end
  
  subgraph ErrorHandling["Error Handling"]
    DLQ[Dead Letter Queue]
    RETRY[Retry Topic]
    ALERT[Alerting System]
  end
  
  subgraph Monitoring["Monitoring"]
    LAG[Lag Monitor]
    METRICS[Metrics Collector]
    DASH[Dashboard]
  end
  
  C1 --> BA
  C2 --> BA
  C3 --> BA
  
  C1 -.->|Errors| DLQ
  C2 -.->|Errors| DLQ
  C3 -.->|Errors| DLQ
  
  ConsumerGroup --> Monitoring
  
  style ConsumerGroup fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
  style ErrorHandling fill:#ffebee,stroke:#e53935,stroke-width:1px
  style Monitoring fill:#e8f5e9,stroke:#43a047,stroke-width:1px

Project Setup

build.gradle

plugins {
    id 'java'
    id 'application'
}

group = 'com.cloudurable.kafka'
version = '2.0-SNAPSHOT'

java {
    sourceCompatibility = JavaVersion.VERSION_17
    targetCompatibility = JavaVersion.VERSION_17
}

repositories {
    mavenCentral()
}

dependencies {
    // Kafka with all features
    implementation 'org.apache.kafka:kafka-clients:3.6.0'
    
    // JSON processing
    implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.0'
    implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.16.0'
    
    // Metrics and monitoring
    implementation 'io.micrometer:micrometer-core:1.12.0'
    implementation 'io.micrometer:micrometer-registry-prometheus:1.12.0'
    
    // Logging
    implementation 'ch.qos.logback:logback-classic:1.4.14'
    
    // Resilience
    implementation 'io.github.resilience4j:resilience4j-circuitbreaker:2.1.0'
    implementation 'io.github.resilience4j:resilience4j-retry:2.1.0'
    
    // Testing
    testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0'
    testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.6.0'
}

application {
    mainClass = 'com.cloudurable.kafka.consumer.AdvancedStockPriceConsumer'
}

test {
    useJUnitPlatform()
}

Modern Stock Price Consumer

Enhanced Stock Price Model

package com.cloudurable.kafka.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
import java.util.UUID;

public record StockPrice(
    @JsonProperty("id") String id,
    @JsonProperty("symbol") String symbol,
    @JsonProperty("price") double price,
    @JsonProperty("volume") long volume,
    @JsonProperty("timestamp") Instant timestamp,
    @JsonProperty("exchange") String exchange
) {
    public static StockPrice create(String symbol, double price, long volume) {
        return new StockPrice(
            UUID.randomUUID().toString(),
            symbol,
            price,
            volume,
            Instant.now(),
            "NYSE"
        );
    }
    
    // Legacy compatibility
    public int getDollars() {
        return (int) price;
    }
    
    public int getCents() {
        return (int) ((price - getDollars()) * 100);
    }
}

Advanced JSON Deserializer

package com.cloudurable.kafka.consumer;

import com.cloudurable.kafka.model.StockPrice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class StockPriceDeserializer implements Deserializer<StockPrice> {
    private static final Logger logger = LoggerFactory.getLogger(StockPriceDeserializer.class);
    private final ObjectMapper objectMapper;
    
    public StockPriceDeserializer() {
        this.objectMapper = new ObjectMapper();
        this.objectMapper.registerModule(new JavaTimeModule());
    }
    
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Configuration if needed
    }
    
    @Override
    public StockPrice deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        
        try {
            return objectMapper.readValue(data, StockPrice.class);
        } catch (Exception e) {
            logger.error("Failed to deserialize StockPrice from topic {}", topic, e);
            throw new SerializationException("Error deserializing StockPrice", e);
        }
    }
    
    @Override
    public void close() {
        // Cleanup if needed
    }
}

Advanced Consumer with KIP-848 Protocol

package com.cloudurable.kafka.consumer;

import com.cloudurable.kafka.model.StockPrice;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

public class AdvancedStockPriceConsumer {
    private static final Logger logger = LoggerFactory.getLogger(AdvancedStockPriceConsumer.class);
    
    private static final String TOPIC = "stock-prices";
    private static final String DLQ_TOPIC = "stock-prices-dlq";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final MeterRegistry meterRegistry = new SimpleMeterRegistry();
    private final Map<String, StockPrice> stockCache = new ConcurrentHashMap<>();
    
    private Consumer<String, StockPrice> createConsumer() {
        Properties props = new Properties();
        
        // Connection
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "advanced-stock-consumer");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "stock-consumer-" + UUID.randomUUID());
        
        // Deserialization
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class.getName());
        
        // KIP-848 Protocol (new consumer protocol)
        props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
        
        // Performance tuning
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024); // 1MB
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        
        // Session management
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
        
        // Offset management
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // Interceptors
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
                  MetricsConsumerInterceptor.class.getName());
        
        Consumer<String, StockPrice> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
        
        return consumer;
    }
    
    public void runConsumer() {
        try (Consumer<String, StockPrice> consumer = createConsumer()) {
            // Register shutdown hook
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                logger.info("Shutting down consumer...");
                running.set(false);
                consumer.wakeup();
            }));
            
            while (running.get()) {
                try {
                    ConsumerRecords<String, StockPrice> records = consumer.poll(Duration.ofMillis(1000));
                    
                    if (!records.isEmpty()) {
                        processRecords(consumer, records);
                    }
                    
                    // Periodic stats logging
                    if (System.currentTimeMillis() % 10000 < 1000) {
                        logConsumerStats(consumer);
                    }
                    
                } catch (WakeupException e) {
                    if (running.get()) throw e;
                } catch (Exception e) {
                    logger.error("Error in consumer loop", e);
                }
            }
        } catch (Exception e) {
            logger.error("Fatal consumer error", e);
        }
    }
    
    private void processRecords(Consumer<String, StockPrice> consumer, 
                               ConsumerRecords<String, StockPrice> records) {
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
        
        for (ConsumerRecord<String, StockPrice> record : records) {
            try {
                processRecord(record);
                
                // Track offset for manual commit
                offsetsToCommit.put(
                    new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1)
                );
                
            } catch (Exception e) {
                handleProcessingError(record, e);
            }
        }
        
        // Commit offsets after successful processing
        if (!offsetsToCommit.isEmpty()) {
            consumer.commitAsync(offsetsToCommit, (offsets, exception) -> {
                if (exception != null) {
                    logger.error("Failed to commit offsets", exception);
                } else {
                    logger.debug("Offsets committed: {}", offsets);
                }
            });
        }
        
        // Update metrics
        meterRegistry.counter("consumer.records.processed").increment(records.count());
    }
    
    private void processRecord(ConsumerRecord<String, StockPrice> record) {
        StockPrice stockPrice = record.value();
        
        // Business logic
        stockCache.put(stockPrice.symbol(), stockPrice);
        
        // Check for price anomalies
        if (stockPrice.price() <= 0) {
            throw new IllegalArgumentException("Invalid stock price: " + stockPrice.price());
        }
        
        logger.info("Processed: {} - ${} @ volume {}", 
                   stockPrice.symbol(), 
                   stockPrice.price(), 
                   stockPrice.volume());
    }
    
    private void handleProcessingError(ConsumerRecord<String, StockPrice> record, Exception e) {
        logger.error("Error processing record from partition {} offset {}", 
                    record.partition(), record.offset(), e);
        
        // Send to DLQ
        sendToDeadLetterQueue(record, e);
        
        // Update error metrics
        meterRegistry.counter("consumer.errors", 
                            "exception", e.getClass().getSimpleName()).increment();
    }
    
    private void sendToDeadLetterQueue(ConsumerRecord<String, StockPrice> record, Exception error) {
        // In production, use a producer to send to DLQ topic
        logger.warn("Would send to DLQ: {} due to {}", record.key(), error.getMessage());
    }
    
    private void logConsumerStats(Consumer<String, StockPrice> consumer) {
        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumer.assignment());
        Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment());
        
        long totalLag = 0;
        for (TopicPartition partition : consumer.assignment()) {
            Long endOffset = endOffsets.get(partition);
            OffsetAndMetadata committedOffset = committed.get(partition);
            
            if (endOffset != null && committedOffset != null) {
                long lag = endOffset - committedOffset.offset();
                totalLag += lag;
                
                meterRegistry.gauge("consumer.lag", 
                                  Collections.singletonList(io.micrometer.core.instrument.Tag.of("partition", String.valueOf(partition.partition()))), 
                                  lag);
            }
        }
        
        logger.info("Consumer stats - Total lag: {}, Cache size: {}", 
                   totalLag, stockCache.size());
    }
    
    public static void main(String[] args) {
        new AdvancedStockPriceConsumer().runConsumer();
    }
}

Consumer Interceptor for Metrics

package com.cloudurable.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class MetricsConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(MetricsConsumerInterceptor.class);
    
    private final AtomicLong recordCount = new AtomicLong(0);
    private final AtomicLong bytesConsumed = new AtomicLong(0);
    
    @Override
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
        recordCount.addAndGet(records.count());
        
        records.forEach(record -> {
            if (record.serializedValueSize() > 0) {
                bytesConsumed.addAndGet(record.serializedValueSize());
            }
        });
        
        // Log metrics periodically
        if (recordCount.get() % 1000 == 0) {
            logger.info("Consumed {} records, {} bytes total", 
                       recordCount.get(), bytesConsumed.get());
        }
        
        return records;
    }
    
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        logger.debug("Committed offsets for {} partitions", offsets.size());
    }
    
    @Override
    public void close() {
        logger.info("Closing interceptor - Total records: {}, Total bytes: {}", 
                   recordCount.get(), bytesConsumed.get());
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
        // Configuration if needed
    }
}

Dead Letter Queue Handler

package com.cloudurable.kafka.consumer;

import com.cloudurable.kafka.model.StockPrice;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;

public class DeadLetterQueueHandler {
    private static final Logger logger = LoggerFactory.getLogger(DeadLetterQueueHandler.class);
    
    private final KafkaProducer<String, String> dlqProducer;
    private final String dlqTopic;
    
    public DeadLetterQueueHandler(String bootstrapServers, String dlqTopic) {
        this.dlqTopic = dlqTopic;
        
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        this.dlqProducer = new KafkaProducer<>(props);
    }
    
    public CompletableFuture<Void> sendToDeadLetterQueue(
            ConsumerRecord<String, StockPrice> originalRecord, 
            Exception error) {
        
        CompletableFuture<Void> future = new CompletableFuture<>();
        
        try {
            // Create DLQ record with error metadata
            String dlqMessage = String.format(
                "{\"originalTopic\":\"%s\",\"originalPartition\":%d,\"originalOffset\":%d," +
                "\"errorType\":\"%s\",\"errorMessage\":\"%s\",\"timestamp\":\"%s\"," +
                "\"originalKey\":\"%s\",\"originalValue\":\"%s\"}",
                originalRecord.topic(),
                originalRecord.partition(),
                originalRecord.offset(),
                error.getClass().getName(),
                error.getMessage(),
                Instant.now().toString(),
                originalRecord.key(),
                originalRecord.value().toString()
            );
            
            ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
                dlqTopic,
                originalRecord.key(),
                dlqMessage
            );
            
            // Add headers for traceability
            dlqRecord.headers()
                .add("original.topic", originalRecord.topic().getBytes(StandardCharsets.UTF_8))
                .add("original.partition", String.valueOf(originalRecord.partition()).getBytes())
                .add("original.offset", String.valueOf(originalRecord.offset()).getBytes())
                .add("error.type", error.getClass().getName().getBytes())
                .add("error.timestamp", Instant.now().toString().getBytes());
            
            dlqProducer.send(dlqRecord, (metadata, exception) -> {
                if (exception != null) {
                    logger.error("Failed to send to DLQ", exception);
                    future.completeExceptionally(exception);
                } else {
                    logger.info("Sent to DLQ: partition={}, offset={}", 
                               metadata.partition(), metadata.offset());
                    future.complete(null);
                }
            });
            
        } catch (Exception e) {
            logger.error("Error preparing DLQ message", e);
            future.completeExceptionally(e);
        }
        
        return future;
    }
    
    public void close() {
        dlqProducer.close();
    }
}

Consumer with Retry Logic

package com.cloudurable.kafka.consumer;

import com.cloudurable.kafka.model.StockPrice;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.function.Supplier;

public class ResilientStockProcessor {
    private static final Logger logger = LoggerFactory.getLogger(ResilientStockProcessor.class);
    
    private final CircuitBreaker circuitBreaker;
    private final Retry retry;
    private final DeadLetterQueueHandler dlqHandler;
    
    public ResilientStockProcessor(DeadLetterQueueHandler dlqHandler) {
        this.dlqHandler = dlqHandler;
        
        // Configure circuit breaker
        CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
            .failureRateThreshold(50)
            .waitDurationInOpenState(Duration.ofSeconds(30))
            .permittedNumberOfCallsInHalfOpenState(3)
            .slidingWindowSize(10)
            .build();
        
        this.circuitBreaker = CircuitBreaker.of("stock-processor", circuitBreakerConfig);
        
        // Configure retry
        RetryConfig retryConfig = RetryConfig.custom()
            .maxAttempts(3)
            .waitDuration(Duration.ofMillis(500))
            .retryExceptions(Exception.class)
            .ignoreExceptions(IllegalArgumentException.class)
            .build();
        
        this.retry = Retry.of("stock-processor", retryConfig);
    }
    
    public void processWithResilience(ConsumerRecord<String, StockPrice> record) {
        Supplier<Void> decoratedSupplier = CircuitBreaker
            .decorateSupplier(circuitBreaker, () -> {
                processStockPrice(record);
                return null;
            });
        
        Supplier<Void> retryableSupplier = Retry
            .decorateSupplier(retry, decoratedSupplier);
        
        try {
            retryableSupplier.get();
        } catch (Exception e) {
            logger.error("Failed to process record after retries", e);
            dlqHandler.sendToDeadLetterQueue(record, e);
        }
    }
    
    private void processStockPrice(ConsumerRecord<String, StockPrice> record) {
        StockPrice stockPrice = record.value();
        
        // Simulate processing that might fail
        if (stockPrice.price() < 0) {
            throw new IllegalArgumentException("Negative stock price");
        }
        
        if (stockPrice.volume() == 0) {
            throw new RuntimeException("Zero volume - market closed?");
        }
        
        // Process the stock price
        logger.info("Processing stock: {} at ${}", stockPrice.symbol(), stockPrice.price());
    }
}

Running the Advanced Consumer

Create Topics

#!/usr/bin/env bash
cd ~/kafka-training

# Create main topic with KRaft
kafka/bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 3 \
    --partitions 6 \
    --topic stock-prices \
    --config min.insync.replicas=2 \
    --config retention.ms=604800000

# Create DLQ topic
kafka/bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 3 \
    --partitions 3 \
    --topic stock-prices-dlq \
    --config retention.ms=2592000000  # 30 days

# Create retry topic
kafka/bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 3 \
    --partitions 3 \
    --topic stock-prices-retry \
    --config retention.ms=86400000  # 1 day

Configuration Best Practices

# application.properties
kafka.bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
kafka.consumer.group.id=advanced-stock-consumer
kafka.consumer.group.protocol=consumer  # KIP-848

# Performance
kafka.consumer.fetch.min.bytes=1048576
kafka.consumer.fetch.max.wait.ms=500
kafka.consumer.max.poll.records=500

# Resilience
kafka.consumer.session.timeout.ms=45000
kafka.consumer.heartbeat.interval.ms=3000
kafka.consumer.max.poll.interval.ms=300000

# DLQ
kafka.dlq.topic=stock-prices-dlq
kafka.dlq.max.retries=3
kafka.dlq.retry.backoff.ms=1000

Monitoring Consumer Lag

package com.cloudurable.kafka.monitoring;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ExecutionException;

public class ConsumerLagMonitor {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerLagMonitor.class);
    
    private final AdminClient adminClient;
    
    public ConsumerLagMonitor(String bootstrapServers) {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        this.adminClient = AdminClient.create(props);
    }
    
    public Map<TopicPartition, Long> getConsumerLag(String groupId) 
            throws ExecutionException, InterruptedException {
        
        Map<TopicPartition, Long> lagMap = new HashMap<>();
        
        // Get consumer group offsets
        ListConsumerGroupOffsetsResult offsetsResult = 
            adminClient.listConsumerGroupOffsets(groupId);
        Map<TopicPartition, OffsetAndMetadata> groupOffsets = 
            offsetsResult.partitionsToOffsetAndMetadata().get();
        
        // Get end offsets for topics
        Set<TopicPartition> partitions = groupOffsets.keySet();
        Map<TopicPartition, OffsetSpec> offsetSpecs = new HashMap<>();
        partitions.forEach(tp -> offsetSpecs.put(tp, OffsetSpec.latest()));
        
        ListOffsetsResult endOffsetsResult = adminClient.listOffsets(offsetSpecs);
        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = 
            endOffsetsResult.all().get();
        
        // Calculate lag
        for (TopicPartition partition : partitions) {
            OffsetAndMetadata committedOffset = groupOffsets.get(partition);
            ListOffsetsResult.ListOffsetsResultInfo endOffset = endOffsets.get(partition);
            
            if (committedOffset != null && endOffset != null) {
                long lag = endOffset.offset() - committedOffset.offset();
                lagMap.put(partition, lag);
            }
        }
        
        return lagMap;
    }
    
    public void close() {
        adminClient.close();
    }
}

Cloud-Native Auto-Scaling Pattern

flowchart LR
  subgraph Monitoring["Monitoring Layer"]
    LAG[Lag Monitor]
    CPU[CPU Monitor]
    MEM[Memory Monitor]
  end
  
  subgraph Scaling["Auto-Scaling"]
    RULES[Scaling Rules]
    SCALER[Horizontal Scaler]
  end
  
  subgraph Consumers["Consumer Instances"]
    C1[Consumer 1]
    C2[Consumer 2]
    C3[Consumer 3]
    CN[Consumer N...]
  end
  
  LAG --> RULES
  CPU --> RULES
  MEM --> RULES
  
  RULES --> SCALER
  SCALER --> Consumers
  
  style Monitoring fill:#e8f5e9,stroke:#43a047,stroke-width:2px
  style Scaling fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
  style Consumers fill:#fff9c4,stroke:#f9a825,stroke-width:1px

Review Questions

What is KIP-848?

KIP-848 introduces broker-coordinated consumer group protocol, eliminating the need for client-side leaders and enabling smoother rebalancing.

When should you use Dead Letter Queues?

Use DLQs for messages that fail processing after retries, allowing investigation without blocking the main processing flow.

How does cooperative rebalancing work?

Cooperative rebalancing allows gradual partition reassignment, minimizing disruption by keeping most partitions assigned during rebalancing.

Why use consumer interceptors?

Interceptors provide a clean way to add cross-cutting concerns like metrics, logging, or security without modifying business logic.

How to monitor consumer lag effectively?

Use AdminClient API or metrics exporters to track lag per partition, setting alerts for high lag conditions.

Summary

Advanced Kafka consumers in 2025 emphasize:

  • Smooth Operations through cooperative rebalancing and KIP-848
  • Error Resilience with DLQs and circuit breakers
  • Observability via interceptors and lag monitoring
  • Cloud-Native patterns for auto-scaling
  • Production-Ready configurations and best practices

These patterns ensure your consumers can handle millions of messages reliably at scale!

About Cloudurable

We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.

                                                                           
comments powered by Disqus

Apache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting