January 9, 2025
🚀 What’s New in This 2025 Update
Major Updates and Changes
- KIP-848 Protocol - Broker-coordinated partition assignment
- Cooperative Rebalancing - Zero-downtime partition reassignment
- Dead Letter Queues - Standard pattern for error handling
- Consumer Interceptors - Cross-cutting concerns made easy
- Automated Scaling - Cloud-native auto-scaling patterns
- Enhanced Monitoring - Built-in lag tracking and alerting
Consumer Evolution Since 2018
- ✅ Smoother Operations - Incremental rebalancing minimizes disruption
- ✅ Better Error Handling - DLQ and retry patterns standardized
- ✅ Improved Observability - Metrics and interceptors built-in
- ✅ Cloud-Native Ready - Auto-scaling and managed services
Welcome to Part 1 of Advanced Kafka Consumers in 2025! This tutorial covers modern consumer patterns that have become essential for production Kafka deployments.
Prerequisites
Before you start:
Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
Advanced Consumer Architecture
flowchart TB
subgraph ConsumerGroup["Consumer Group with KIP-848"]
C1[Consumer 1<br>Partitions: 0,1]
C2[Consumer 2<br>Partitions: 2,3]
C3[Consumer 3<br>Partitions: 4,5]
subgraph Coordinator["Broker Coordinator"]
BA[Broker Assignment<br>Logic]
OM[Offset Manager]
HB[Heartbeat Monitor]
end
end
subgraph ErrorHandling["Error Handling"]
DLQ[Dead Letter Queue]
RETRY[Retry Topic]
ALERT[Alerting System]
end
subgraph Monitoring["Monitoring"]
LAG[Lag Monitor]
METRICS[Metrics Collector]
DASH[Dashboard]
end
C1 --> BA
C2 --> BA
C3 --> BA
C1 -.->|Errors| DLQ
C2 -.->|Errors| DLQ
C3 -.->|Errors| DLQ
ConsumerGroup --> Monitoring
style ConsumerGroup fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style ErrorHandling fill:#ffebee,stroke:#e53935,stroke-width:1px
style Monitoring fill:#e8f5e9,stroke:#43a047,stroke-width:1px
Project Setup
build.gradle
plugins {
id 'java'
id 'application'
}
group = 'com.cloudurable.kafka'
version = '2.0-SNAPSHOT'
java {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}
repositories {
mavenCentral()
}
dependencies {
// Kafka with all features
implementation 'org.apache.kafka:kafka-clients:3.6.0'
// JSON processing
implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.0'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.16.0'
// Metrics and monitoring
implementation 'io.micrometer:micrometer-core:1.12.0'
implementation 'io.micrometer:micrometer-registry-prometheus:1.12.0'
// Logging
implementation 'ch.qos.logback:logback-classic:1.4.14'
// Resilience
implementation 'io.github.resilience4j:resilience4j-circuitbreaker:2.1.0'
implementation 'io.github.resilience4j:resilience4j-retry:2.1.0'
// Testing
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0'
testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.6.0'
}
application {
mainClass = 'com.cloudurable.kafka.consumer.AdvancedStockPriceConsumer'
}
test {
useJUnitPlatform()
}
Modern Stock Price Consumer
Enhanced Stock Price Model
package com.cloudurable.kafka.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
import java.util.UUID;
public record StockPrice(
@JsonProperty("id") String id,
@JsonProperty("symbol") String symbol,
@JsonProperty("price") double price,
@JsonProperty("volume") long volume,
@JsonProperty("timestamp") Instant timestamp,
@JsonProperty("exchange") String exchange
) {
public static StockPrice create(String symbol, double price, long volume) {
return new StockPrice(
UUID.randomUUID().toString(),
symbol,
price,
volume,
Instant.now(),
"NYSE"
);
}
// Legacy compatibility
public int getDollars() {
return (int) price;
}
public int getCents() {
return (int) ((price - getDollars()) * 100);
}
}
Advanced JSON Deserializer
package com.cloudurable.kafka.consumer;
import com.cloudurable.kafka.model.StockPrice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class StockPriceDeserializer implements Deserializer<StockPrice> {
private static final Logger logger = LoggerFactory.getLogger(StockPriceDeserializer.class);
private final ObjectMapper objectMapper;
public StockPriceDeserializer() {
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// Configuration if needed
}
@Override
public StockPrice deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
try {
return objectMapper.readValue(data, StockPrice.class);
} catch (Exception e) {
logger.error("Failed to deserialize StockPrice from topic {}", topic, e);
throw new SerializationException("Error deserializing StockPrice", e);
}
}
@Override
public void close() {
// Cleanup if needed
}
}
Advanced Consumer with KIP-848 Protocol
package com.cloudurable.kafka.consumer;
import com.cloudurable.kafka.model.StockPrice;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class AdvancedStockPriceConsumer {
private static final Logger logger = LoggerFactory.getLogger(AdvancedStockPriceConsumer.class);
private static final String TOPIC = "stock-prices";
private static final String DLQ_TOPIC = "stock-prices-dlq";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private final AtomicBoolean running = new AtomicBoolean(true);
private final MeterRegistry meterRegistry = new SimpleMeterRegistry();
private final Map<String, StockPrice> stockCache = new ConcurrentHashMap<>();
private Consumer<String, StockPrice> createConsumer() {
Properties props = new Properties();
// Connection
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "advanced-stock-consumer");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "stock-consumer-" + UUID.randomUUID());
// Deserialization
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class.getName());
// KIP-848 Protocol (new consumer protocol)
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
// Performance tuning
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024); // 1MB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// Session management
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// Offset management
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Interceptors
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
MetricsConsumerInterceptor.class.getName());
Consumer<String, StockPrice> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
public void runConsumer() {
try (Consumer<String, StockPrice> consumer = createConsumer()) {
// Register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutting down consumer...");
running.set(false);
consumer.wakeup();
}));
while (running.get()) {
try {
ConsumerRecords<String, StockPrice> records = consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty()) {
processRecords(consumer, records);
}
// Periodic stats logging
if (System.currentTimeMillis() % 10000 < 1000) {
logConsumerStats(consumer);
}
} catch (WakeupException e) {
if (running.get()) throw e;
} catch (Exception e) {
logger.error("Error in consumer loop", e);
}
}
} catch (Exception e) {
logger.error("Fatal consumer error", e);
}
}
private void processRecords(Consumer<String, StockPrice> consumer,
ConsumerRecords<String, StockPrice> records) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (ConsumerRecord<String, StockPrice> record : records) {
try {
processRecord(record);
// Track offset for manual commit
offsetsToCommit.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
} catch (Exception e) {
handleProcessingError(record, e);
}
}
// Commit offsets after successful processing
if (!offsetsToCommit.isEmpty()) {
consumer.commitAsync(offsetsToCommit, (offsets, exception) -> {
if (exception != null) {
logger.error("Failed to commit offsets", exception);
} else {
logger.debug("Offsets committed: {}", offsets);
}
});
}
// Update metrics
meterRegistry.counter("consumer.records.processed").increment(records.count());
}
private void processRecord(ConsumerRecord<String, StockPrice> record) {
StockPrice stockPrice = record.value();
// Business logic
stockCache.put(stockPrice.symbol(), stockPrice);
// Check for price anomalies
if (stockPrice.price() <= 0) {
throw new IllegalArgumentException("Invalid stock price: " + stockPrice.price());
}
logger.info("Processed: {} - ${} @ volume {}",
stockPrice.symbol(),
stockPrice.price(),
stockPrice.volume());
}
private void handleProcessingError(ConsumerRecord<String, StockPrice> record, Exception e) {
logger.error("Error processing record from partition {} offset {}",
record.partition(), record.offset(), e);
// Send to DLQ
sendToDeadLetterQueue(record, e);
// Update error metrics
meterRegistry.counter("consumer.errors",
"exception", e.getClass().getSimpleName()).increment();
}
private void sendToDeadLetterQueue(ConsumerRecord<String, StockPrice> record, Exception error) {
// In production, use a producer to send to DLQ topic
logger.warn("Would send to DLQ: {} due to {}", record.key(), error.getMessage());
}
private void logConsumerStats(Consumer<String, StockPrice> consumer) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumer.assignment());
Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment());
long totalLag = 0;
for (TopicPartition partition : consumer.assignment()) {
Long endOffset = endOffsets.get(partition);
OffsetAndMetadata committedOffset = committed.get(partition);
if (endOffset != null && committedOffset != null) {
long lag = endOffset - committedOffset.offset();
totalLag += lag;
meterRegistry.gauge("consumer.lag",
Collections.singletonList(io.micrometer.core.instrument.Tag.of("partition", String.valueOf(partition.partition()))),
lag);
}
}
logger.info("Consumer stats - Total lag: {}, Cache size: {}",
totalLag, stockCache.size());
}
public static void main(String[] args) {
new AdvancedStockPriceConsumer().runConsumer();
}
}
Consumer Interceptor for Metrics
package com.cloudurable.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class MetricsConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
private static final Logger logger = LoggerFactory.getLogger(MetricsConsumerInterceptor.class);
private final AtomicLong recordCount = new AtomicLong(0);
private final AtomicLong bytesConsumed = new AtomicLong(0);
@Override
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
recordCount.addAndGet(records.count());
records.forEach(record -> {
if (record.serializedValueSize() > 0) {
bytesConsumed.addAndGet(record.serializedValueSize());
}
});
// Log metrics periodically
if (recordCount.get() % 1000 == 0) {
logger.info("Consumed {} records, {} bytes total",
recordCount.get(), bytesConsumed.get());
}
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
logger.debug("Committed offsets for {} partitions", offsets.size());
}
@Override
public void close() {
logger.info("Closing interceptor - Total records: {}, Total bytes: {}",
recordCount.get(), bytesConsumed.get());
}
@Override
public void configure(Map<String, ?> configs) {
// Configuration if needed
}
}
Dead Letter Queue Handler
package com.cloudurable.kafka.consumer;
import com.cloudurable.kafka.model.StockPrice;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
public class DeadLetterQueueHandler {
private static final Logger logger = LoggerFactory.getLogger(DeadLetterQueueHandler.class);
private final KafkaProducer<String, String> dlqProducer;
private final String dlqTopic;
public DeadLetterQueueHandler(String bootstrapServers, String dlqTopic) {
this.dlqTopic = dlqTopic;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
this.dlqProducer = new KafkaProducer<>(props);
}
public CompletableFuture<Void> sendToDeadLetterQueue(
ConsumerRecord<String, StockPrice> originalRecord,
Exception error) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
// Create DLQ record with error metadata
String dlqMessage = String.format(
"{\"originalTopic\":\"%s\",\"originalPartition\":%d,\"originalOffset\":%d," +
"\"errorType\":\"%s\",\"errorMessage\":\"%s\",\"timestamp\":\"%s\"," +
"\"originalKey\":\"%s\",\"originalValue\":\"%s\"}",
originalRecord.topic(),
originalRecord.partition(),
originalRecord.offset(),
error.getClass().getName(),
error.getMessage(),
Instant.now().toString(),
originalRecord.key(),
originalRecord.value().toString()
);
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
dlqTopic,
originalRecord.key(),
dlqMessage
);
// Add headers for traceability
dlqRecord.headers()
.add("original.topic", originalRecord.topic().getBytes(StandardCharsets.UTF_8))
.add("original.partition", String.valueOf(originalRecord.partition()).getBytes())
.add("original.offset", String.valueOf(originalRecord.offset()).getBytes())
.add("error.type", error.getClass().getName().getBytes())
.add("error.timestamp", Instant.now().toString().getBytes());
dlqProducer.send(dlqRecord, (metadata, exception) -> {
if (exception != null) {
logger.error("Failed to send to DLQ", exception);
future.completeExceptionally(exception);
} else {
logger.info("Sent to DLQ: partition={}, offset={}",
metadata.partition(), metadata.offset());
future.complete(null);
}
});
} catch (Exception e) {
logger.error("Error preparing DLQ message", e);
future.completeExceptionally(e);
}
return future;
}
public void close() {
dlqProducer.close();
}
}
Consumer with Retry Logic
package com.cloudurable.kafka.consumer;
import com.cloudurable.kafka.model.StockPrice;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.function.Supplier;
public class ResilientStockProcessor {
private static final Logger logger = LoggerFactory.getLogger(ResilientStockProcessor.class);
private final CircuitBreaker circuitBreaker;
private final Retry retry;
private final DeadLetterQueueHandler dlqHandler;
public ResilientStockProcessor(DeadLetterQueueHandler dlqHandler) {
this.dlqHandler = dlqHandler;
// Configure circuit breaker
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofSeconds(30))
.permittedNumberOfCallsInHalfOpenState(3)
.slidingWindowSize(10)
.build();
this.circuitBreaker = CircuitBreaker.of("stock-processor", circuitBreakerConfig);
// Configure retry
RetryConfig retryConfig = RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.ofMillis(500))
.retryExceptions(Exception.class)
.ignoreExceptions(IllegalArgumentException.class)
.build();
this.retry = Retry.of("stock-processor", retryConfig);
}
public void processWithResilience(ConsumerRecord<String, StockPrice> record) {
Supplier<Void> decoratedSupplier = CircuitBreaker
.decorateSupplier(circuitBreaker, () -> {
processStockPrice(record);
return null;
});
Supplier<Void> retryableSupplier = Retry
.decorateSupplier(retry, decoratedSupplier);
try {
retryableSupplier.get();
} catch (Exception e) {
logger.error("Failed to process record after retries", e);
dlqHandler.sendToDeadLetterQueue(record, e);
}
}
private void processStockPrice(ConsumerRecord<String, StockPrice> record) {
StockPrice stockPrice = record.value();
// Simulate processing that might fail
if (stockPrice.price() < 0) {
throw new IllegalArgumentException("Negative stock price");
}
if (stockPrice.volume() == 0) {
throw new RuntimeException("Zero volume - market closed?");
}
// Process the stock price
logger.info("Processing stock: {} at ${}", stockPrice.symbol(), stockPrice.price());
}
}
Running the Advanced Consumer
Create Topics
#!/usr/bin/env bash
cd ~/kafka-training
# Create main topic with KRaft
kafka/bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 3 \
--partitions 6 \
--topic stock-prices \
--config min.insync.replicas=2 \
--config retention.ms=604800000
# Create DLQ topic
kafka/bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 3 \
--partitions 3 \
--topic stock-prices-dlq \
--config retention.ms=2592000000 # 30 days
# Create retry topic
kafka/bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 3 \
--partitions 3 \
--topic stock-prices-retry \
--config retention.ms=86400000 # 1 day
Configuration Best Practices
# application.properties
kafka.bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
kafka.consumer.group.id=advanced-stock-consumer
kafka.consumer.group.protocol=consumer # KIP-848
# Performance
kafka.consumer.fetch.min.bytes=1048576
kafka.consumer.fetch.max.wait.ms=500
kafka.consumer.max.poll.records=500
# Resilience
kafka.consumer.session.timeout.ms=45000
kafka.consumer.heartbeat.interval.ms=3000
kafka.consumer.max.poll.interval.ms=300000
# DLQ
kafka.dlq.topic=stock-prices-dlq
kafka.dlq.max.retries=3
kafka.dlq.retry.backoff.ms=1000
Monitoring Consumer Lag
package com.cloudurable.kafka.monitoring;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class ConsumerLagMonitor {
private static final Logger logger = LoggerFactory.getLogger(ConsumerLagMonitor.class);
private final AdminClient adminClient;
public ConsumerLagMonitor(String bootstrapServers) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
this.adminClient = AdminClient.create(props);
}
public Map<TopicPartition, Long> getConsumerLag(String groupId)
throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> lagMap = new HashMap<>();
// Get consumer group offsets
ListConsumerGroupOffsetsResult offsetsResult =
adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> groupOffsets =
offsetsResult.partitionsToOffsetAndMetadata().get();
// Get end offsets for topics
Set<TopicPartition> partitions = groupOffsets.keySet();
Map<TopicPartition, OffsetSpec> offsetSpecs = new HashMap<>();
partitions.forEach(tp -> offsetSpecs.put(tp, OffsetSpec.latest()));
ListOffsetsResult endOffsetsResult = adminClient.listOffsets(offsetSpecs);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
endOffsetsResult.all().get();
// Calculate lag
for (TopicPartition partition : partitions) {
OffsetAndMetadata committedOffset = groupOffsets.get(partition);
ListOffsetsResult.ListOffsetsResultInfo endOffset = endOffsets.get(partition);
if (committedOffset != null && endOffset != null) {
long lag = endOffset.offset() - committedOffset.offset();
lagMap.put(partition, lag);
}
}
return lagMap;
}
public void close() {
adminClient.close();
}
}
Cloud-Native Auto-Scaling Pattern
flowchart LR
subgraph Monitoring["Monitoring Layer"]
LAG[Lag Monitor]
CPU[CPU Monitor]
MEM[Memory Monitor]
end
subgraph Scaling["Auto-Scaling"]
RULES[Scaling Rules]
SCALER[Horizontal Scaler]
end
subgraph Consumers["Consumer Instances"]
C1[Consumer 1]
C2[Consumer 2]
C3[Consumer 3]
CN[Consumer N...]
end
LAG --> RULES
CPU --> RULES
MEM --> RULES
RULES --> SCALER
SCALER --> Consumers
style Monitoring fill:#e8f5e9,stroke:#43a047,stroke-width:2px
style Scaling fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style Consumers fill:#fff9c4,stroke:#f9a825,stroke-width:1px
Review Questions
What is KIP-848?
KIP-848 introduces broker-coordinated consumer group protocol, eliminating the need for client-side leaders and enabling smoother rebalancing.
When should you use Dead Letter Queues?
Use DLQs for messages that fail processing after retries, allowing investigation without blocking the main processing flow.
How does cooperative rebalancing work?
Cooperative rebalancing allows gradual partition reassignment, minimizing disruption by keeping most partitions assigned during rebalancing.
Why use consumer interceptors?
Interceptors provide a clean way to add cross-cutting concerns like metrics, logging, or security without modifying business logic.
How to monitor consumer lag effectively?
Use AdminClient API or metrics exporters to track lag per partition, setting alerts for high lag conditions.
Summary
Advanced Kafka consumers in 2025 emphasize:
- Smooth Operations through cooperative rebalancing and KIP-848
- Error Resilience with DLQs and circuit breakers
- Observability via interceptors and lag monitoring
- Cloud-Native patterns for auto-scaling
- Production-Ready configurations and best practices
These patterns ensure your consumers can handle millions of messages reliably at scale!
Related Content
- What is Kafka?
- Kafka Architecture
- Kafka Consumer Architecture
- Kafka Tutorial: Creating a Consumer
- Kafka Producer Advanced
- Schema Registry
About Cloudurable
We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.
TweetApache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting