January 9, 2025
🚀 What’s New in This 2025 Update
Major Updates and Changes
- KIP-848 Protocol - Zero-downtime rebalancing (no more stop-the-world!)
- KRaft Mode - No ZooKeeper needed
- Exactly-Once Consumers - Transactional message processing
- Share Groups - New queue-like consumption pattern
- Enhanced Monitoring - Built-in lag tracking
- Cloud-Native Patterns - Container-ready consumers
Consumer Evolution Since 2017
- ✅ Smoother Rebalancing - Incremental cooperative protocol
- ✅ Better Performance - Optimized fetch and poll
- ✅ Improved Reliability - Automatic offset management
- ✅ Enhanced Observability - Detailed metrics and tracing
Ready to build modern Kafka consumers? Let’s create production-ready consumers that can handle millions of messages reliably!
Kafka Tutorial: Writing a Kafka Consumer in Java
flowchart TB
subgraph ConsumerGroup["Consumer Group: analytics-service"]
C1[Consumer 1<br>Partitions: 0,1,2,3]
C2[Consumer 2<br>Partitions: 4,5,6,7]
C3[Consumer 3<br>Partitions: 8,9,10,11,12]
end
subgraph Topic["Topic: events (13 partitions)"]
P0[P0] --> C1
P1[P1] --> C1
P2[P2] --> C1
P3[P3] --> C1
P4[P4] --> C2
P5[P5] --> C2
P6[P6] --> C2
P7[P7] --> C2
P8[P8] --> C3
P9[P9] --> C3
P10[P10] --> C3
P11[P11] --> C3
P12[P12] --> C3
end
classDef consumer fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#333333
classDef partition fill:#e8f5e9,stroke:#43a047,stroke-width:1px,color:#333333
class C1,C2,C3 consumer
class P0,P1,P2,P3,P4,P5,P6,P7,P8,P9,P10,P11,P12 partition
In this tutorial, we’ll create modern Kafka consumers that:
- Use the new KIP-848 rebalancing protocol for zero-downtime operations
- Implement exactly-once semantics with transactions
- Handle errors gracefully with dead letter queues
- Monitor consumer lag for operational excellence
- Follow cloud-native best practices
Prerequisites
Before you start:
- Complete Kafka Producer Tutorial
- Java 17+ installed
- Understanding of Kafka Consumer Architecture
- Kafka 4.0 running with KRaft mode
Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
Modern Kafka Consumer Implementation
Event Model (Reusing from Producer)
package com.cloudurable.kafka.model;
import java.time.Instant;
import java.util.UUID;
public record Event(
String id,
String userId,
String type,
String data,
Instant timestamp
) {
public static Event create(String userId, String type, String data) {
return new Event(
UUID.randomUUID().toString(),
userId,
type,
data,
Instant.now()
);
}
}
JSON Deserializer
package com.cloudurable.kafka.serialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.kafka.common.serialization.Deserializer;
public class JsonDeserializer<T> implements Deserializer<T> {
private final ObjectMapper objectMapper;
private final Class<T> clazz;
public JsonDeserializer(Class<T> clazz) {
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
this.clazz = clazz;
}
@Override
public T deserialize(String topic, byte[] data) {
if (data == null) return null;
try {
return objectMapper.readValue(data, clazz);
} catch (Exception e) {
throw new RuntimeException("Error deserializing JSON", e);
}
}
}
Basic Consumer with KIP-848 Protocol
package com.cloudurable.kafka;
import com.cloudurable.kafka.model.Event;
import com.cloudurable.kafka.serialization.JsonDeserializer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ModernKafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(ModernKafkaConsumer.class);
private static final String TOPIC = "events";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static Consumer<String, Event> createConsumer(String groupId) {
Properties props = new Properties();
// Connection settings
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "event-consumer-" + System.currentTimeMillis());
// Use new KIP-848 consumer protocol for better rebalancing
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
// Deserialization
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
props.put("value.deserializer.class", Event.class.getName());
// Performance optimizations
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 1KB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100); // 100ms
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// Offset management
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Manual commit for control
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Session timeout for faster failure detection
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); // 10 seconds
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 3 seconds
return new KafkaConsumer<>(props);
}
public static void processMessages(Consumer<String, Event> consumer) {
consumer.subscribe(Collections.singletonList(TOPIC));
try {
while (true) {
ConsumerRecords<String, Event> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
logger.info("Fetched {} records", records.count());
for (ConsumerRecord<String, Event> record : records) {
processRecord(record);
}
// Commit after successful processing
consumer.commitSync();
}
}
} catch (Exception e) {
logger.error("Error in consumer loop", e);
} finally {
consumer.close();
}
}
private static void processRecord(ConsumerRecord<String, Event> record) {
logger.info("Processing: partition={}, offset={}, key={}, event={}",
record.partition(), record.offset(), record.key(), record.value());
// Simulate business logic
Event event = record.value();
switch (event.type()) {
case "page-view" -> processPageView(event);
case "purchase" -> processPurchase(event);
case "login" -> processLogin(event);
default -> logger.warn("Unknown event type: {}", event.type());
}
}
private static void processPageView(Event event) {
// Business logic for page views
logger.debug("Processing page view: {}", event);
}
private static void processPurchase(Event event) {
// Business logic for purchases
logger.debug("Processing purchase: {}", event);
}
private static void processLogin(Event event) {
// Business logic for logins
logger.debug("Processing login: {}", event);
}
}
Consumer with Error Handling and Dead Letter Queue
package com.cloudurable.kafka;
import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
public class ResilientConsumer {
private static final Logger logger = LoggerFactory.getLogger(ResilientConsumer.class);
private static final String TOPIC = "events";
private static final String DLQ_TOPIC = "events-dlq";
private static final int MAX_RETRIES = 3;
private final Consumer<String, Event> consumer;
private final Producer<String, String> dlqProducer;
private final Map<TopicPartition, AtomicLong> partitionRetryCount = new HashMap<>();
public ResilientConsumer(String groupId) {
this.consumer = ModernKafkaConsumer.createConsumer(groupId);
this.dlqProducer = createDLQProducer();
}
private Producer<String, String> createDLQProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new KafkaProducer<>(props);
}
public void processWithRetries() {
consumer.subscribe(Collections.singletonList(TOPIC));
try {
while (true) {
ConsumerRecords<String, Event> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Event> record : records) {
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
try {
processRecord(record);
// Reset retry count on success
partitionRetryCount.remove(tp);
} catch (Exception e) {
handleProcessingError(record, e, tp);
}
}
if (!records.isEmpty()) {
consumer.commitSync();
}
}
} finally {
consumer.close();
dlqProducer.close();
}
}
private void handleProcessingError(ConsumerRecord<String, Event> record,
Exception e,
TopicPartition tp) {
AtomicLong retryCount = partitionRetryCount.computeIfAbsent(tp, k -> new AtomicLong(0));
long currentRetry = retryCount.incrementAndGet();
logger.error("Error processing record at partition {} offset {}, retry {}/{}",
record.partition(), record.offset(), currentRetry, MAX_RETRIES, e);
if (currentRetry >= MAX_RETRIES) {
// Send to DLQ
sendToDLQ(record, e);
// Seek past the problematic record
consumer.seek(tp, record.offset() + 1);
// Reset retry count
partitionRetryCount.remove(tp);
} else {
// Pause partition and retry later
consumer.pause(Collections.singleton(tp));
// Schedule unpause after backoff
scheduleUnpause(tp, currentRetry);
}
}
private void sendToDLQ(ConsumerRecord<String, Event> record, Exception error) {
try {
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
DLQ_TOPIC,
record.key(),
record.value().toString()
);
// Add error metadata as headers
dlqRecord.headers()
.add("original-topic", record.topic().getBytes())
.add("original-partition", String.valueOf(record.partition()).getBytes())
.add("original-offset", String.valueOf(record.offset()).getBytes())
.add("error-message", error.getMessage().getBytes())
.add("error-class", error.getClass().getName().getBytes());
dlqProducer.send(dlqRecord).get();
logger.info("Sent failed record to DLQ: {}", record.key());
} catch (Exception dlqError) {
logger.error("Failed to send to DLQ", dlqError);
// Could implement additional fallback here
}
}
private void scheduleUnpause(TopicPartition tp, long retryCount) {
// Exponential backoff
long backoffMs = Math.min(1000 * (long) Math.pow(2, retryCount), 60000);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
consumer.resume(Collections.singleton(tp));
logger.info("Resumed partition {} after {} ms backoff", tp, backoffMs);
}
}, backoffMs);
}
private void processRecord(ConsumerRecord<String, Event> record) throws Exception {
// Simulate processing that might fail
Event event = record.value();
if (event.data().contains("error")) {
throw new RuntimeException("Simulated processing error");
}
// Normal processing
logger.info("Successfully processed: {}", event);
}
}
Exactly-Once Consumer with Transactions
package com.cloudurable.kafka;
import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
public class ExactlyOnceConsumer {
private static final Logger logger = LoggerFactory.getLogger(ExactlyOnceConsumer.class);
private static final String INPUT_TOPIC = "events";
private static final String OUTPUT_TOPIC = "processed-events";
public static void processExactlyOnce(String groupId) {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put("value.deserializer.class", Event.class.getName());
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "exactly-once-consumer-" + groupId);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
try (Consumer<String, Event> consumer = new KafkaConsumer<>(consumerProps);
Producer<String, Event> producer = new KafkaProducer<>(producerProps)) {
// Initialize transactions
producer.initTransactions();
consumer.subscribe(Collections.singletonList(INPUT_TOPIC));
while (true) {
ConsumerRecords<String, Event> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
// Begin transaction
producer.beginTransaction();
try {
// Process records
for (ConsumerRecord<String, Event> record : records) {
Event processedEvent = processEvent(record.value());
// Send to output topic
producer.send(new ProducerRecord<>(
OUTPUT_TOPIC,
record.key(),
processedEvent
));
}
// Send consumer offsets in same transaction
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, Event>> partitionRecords =
records.records(partition);
long lastOffset = partitionRecords
.get(partitionRecords.size() - 1).offset();
offsetsToCommit.put(partition,
new OffsetAndMetadata(lastOffset + 1));
}
// Commit transaction with offsets
producer.sendOffsetsToTransaction(offsetsToCommit,
new ConsumerGroupMetadata(groupId));
producer.commitTransaction();
logger.info("Successfully processed {} records in transaction",
records.count());
} catch (Exception e) {
logger.error("Error in transaction, aborting", e);
producer.abortTransaction();
throw e;
}
}
}
}
}
private static Event processEvent(Event event) {
// Transform the event
return Event.create(
event.userId(),
"processed-" + event.type(),
event.data() + " [processed]"
);
}
}
Consumer Group Coordination Demo
package com.cloudurable.kafka;
import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class ConsumerGroupDemo {
private static final Logger logger = LoggerFactory.getLogger(ConsumerGroupDemo.class);
public static class PartitionAwareConsumer implements ConsumerRebalanceListener {
private final String consumerId;
private final Consumer<String, Event> consumer;
public PartitionAwareConsumer(String consumerId, String groupId) {
this.consumerId = consumerId;
this.consumer = createConsumer(consumerId, groupId);
}
private Consumer<String, Event> createConsumer(String consumerId, String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId);
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"); // KIP-848
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put("value.deserializer.class", Event.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.info("Consumer {} - Partitions revoked: {}", consumerId, partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("Consumer {} - Partitions assigned: {}", consumerId, partitions);
}
public void consume(CountDownLatch latch) {
consumer.subscribe(Collections.singletonList("events"), this);
try {
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, Event> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Event> record : records) {
logger.info("Consumer {} - Partition: {}, Offset: {}, Event: {}",
consumerId, record.partition(), record.offset(),
record.value().type());
}
if (!records.isEmpty()) {
consumer.commitSync();
}
}
} finally {
consumer.close();
latch.countDown();
}
}
}
public static void main(String[] args) throws InterruptedException {
String groupId = "demo-consumer-group";
int numConsumers = 3;
CountDownLatch latch = new CountDownLatch(numConsumers);
// Start multiple consumers in the same group
for (int i = 0; i < numConsumers; i++) {
String consumerId = "consumer-" + i;
new Thread(() -> {
PartitionAwareConsumer consumer =
new PartitionAwareConsumer(consumerId, groupId);
consumer.consume(latch);
}).start();
}
// Wait for all consumers to finish
latch.await();
}
}
Consumer Lag Monitoring
package com.cloudurable.kafka.monitoring;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class ConsumerLagMonitor {
private static final Logger logger = LoggerFactory.getLogger(ConsumerLagMonitor.class);
private final AdminClient adminClient;
public ConsumerLagMonitor() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
this.adminClient = AdminClient.create(props);
}
public Map<TopicPartition, Long> calculateLag(String groupId)
throws ExecutionException, InterruptedException {
// Get consumer group offsets
ListConsumerGroupOffsetsResult offsetsResult =
adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> groupOffsets =
offsetsResult.partitionsToOffsetAndMetadata().get();
// Get topic end offsets
Set<TopicPartition> partitions = groupOffsets.keySet();
Map<TopicPartition, OffsetSpec> offsetSpecs = partitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
ListOffsetsResult listOffsetsResult =
adminClient.listOffsets(offsetSpecs);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
listOffsetsResult.all().get();
// Calculate lag
Map<TopicPartition, Long> lagMap = new HashMap<>();
for (TopicPartition partition : partitions) {
long consumerOffset = groupOffsets.get(partition).offset();
long endOffset = endOffsets.get(partition).offset();
long lag = endOffset - consumerOffset;
lagMap.put(partition, lag);
if (lag > 1000) {
logger.warn("High lag detected for {} in group {}: {} messages behind",
partition, groupId, lag);
}
}
return lagMap;
}
public void monitorLag(String groupId, long intervalMs) {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
Map<TopicPartition, Long> lag = calculateLag(groupId);
long totalLag = lag.values().stream()
.mapToLong(Long::longValue)
.sum();
logger.info("Consumer group {} - Total lag: {} messages",
groupId, totalLag);
// Export metrics to monitoring system
exportMetrics(groupId, lag);
} catch (Exception e) {
logger.error("Error monitoring lag", e);
}
}
}, 0, intervalMs);
}
private void exportMetrics(String groupId, Map<TopicPartition, Long> lag) {
// Export to Prometheus, CloudWatch, etc.
lag.forEach((partition, lagValue) -> {
// Example: Push to metrics system
logger.debug("metric.consumer.lag{{group={},topic={},partition={}}} {}",
groupId, partition.topic(), partition.partition(), lagValue);
});
}
public void close() {
adminClient.close();
}
}
Cloud-Native Consumer Pattern
package com.cloudurable.kafka.cloud;
import com.cloudurable.kafka.model.Event;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.apache.kafka.clients.consumer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
public class CloudNativeConsumer {
private static final Logger logger = LoggerFactory.getLogger(CloudNativeConsumer.class);
private final Consumer<String, Event> consumer;
private final MeterRegistry meterRegistry;
private final AtomicBoolean running = new AtomicBoolean(true);
private final String consumerId;
public CloudNativeConsumer(String groupId, MeterRegistry meterRegistry) {
this.consumerId = System.getenv("HOSTNAME"); // Pod name in K8s
this.consumer = createConsumer(groupId);
this.meterRegistry = meterRegistry;
// Register shutdown hook for graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
private Consumer<String, Event> createConsumer(String groupId) {
Properties props = new Properties();
// Configuration from environment variables (12-factor app)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
System.getenv().getOrDefault("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"));
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId);
// Cloud-optimized settings
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); // 20s for cloud
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 6000); // 6s
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 min
// Security from environment
String securityProtocol = System.getenv("KAFKA_SECURITY_PROTOCOL");
if (securityProtocol != null) {
props.put("security.protocol", securityProtocol);
props.put("sasl.mechanism", System.getenv("KAFKA_SASL_MECHANISM"));
props.put("sasl.jaas.config", System.getenv("KAFKA_SASL_JAAS_CONFIG"));
}
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put("value.deserializer.class", Event.class.getName());
return new KafkaConsumer<>(props);
}
public void consume() {
consumer.subscribe(Collections.singletonList("events"));
while (running.get()) {
try {
ConsumerRecords<String, Event> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
Timer.Sample sample = Timer.start(meterRegistry);
processRecords(records);
consumer.commitSync();
sample.stop(meterRegistry.timer("consumer.batch.processing.time",
"consumer.id", consumerId,
"group.id", consumer.groupMetadata().groupId()));
meterRegistry.counter("consumer.records.processed",
"consumer.id", consumerId)
.increment(records.count());
}
// Health check
updateHealthStatus();
} catch (Exception e) {
logger.error("Error in consumer loop", e);
meterRegistry.counter("consumer.errors",
"consumer.id", consumerId,
"error.type", e.getClass().getSimpleName())
.increment();
// Implement circuit breaker pattern
handleError(e);
}
}
}
private void processRecords(ConsumerRecords<String, Event> records) {
records.forEach(record -> {
try {
processEvent(record.value());
} catch (Exception e) {
logger.error("Error processing record", e);
// Send to DLQ or retry logic
}
});
}
private void processEvent(Event event) {
// Business logic
logger.info("Processing event: {}", event);
}
private void updateHealthStatus() {
// Update health endpoint for K8s liveness/readiness probes
// This could write to a file or update a health endpoint
}
private void handleError(Exception e) {
// Implement exponential backoff or circuit breaker
if (e instanceof org.apache.kafka.common.errors.TimeoutException) {
logger.warn("Timeout detected, backing off");
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
public void shutdown() {
logger.info("Shutting down consumer {}", consumerId);
running.set(false);
consumer.wakeup(); // Interrupt poll()
consumer.close();
}
}
Testing Your Consumer
package com.cloudurable.kafka;
import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.*;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
class ConsumerTest {
@Test
void testConsumerProcessing() {
// Create mock consumer
Consumer<String, Event> mockConsumer = mock(Consumer.class);
// Create test records
List<ConsumerRecord<String, Event>> recordsList = Arrays.asList(
new ConsumerRecord<>("events", 0, 100L, "user1",
Event.create("user1", "login", "{}")),
new ConsumerRecord<>("events", 1, 200L, "user2",
Event.create("user2", "purchase", "{}"))
);
ConsumerRecords<String, Event> records = new ConsumerRecords<>(
Map.of(
new TopicPartition("events", 0),
Collections.singletonList(recordsList.get(0)),
new TopicPartition("events", 1),
Collections.singletonList(recordsList.get(1))
)
);
// Mock behavior
when(mockConsumer.poll(any(Duration.class))).thenReturn(records);
// Test processing
ConsumerRecords<String, Event> result = mockConsumer.poll(Duration.ofMillis(100));
assertEquals(2, result.count());
// Verify first record
ConsumerRecord<String, Event> first = result.iterator().next();
assertEquals("user1", first.key());
assertEquals("login", first.value().type());
}
}
Performance Tuning
flowchart TB
subgraph Tuning["Consumer Performance Tuning"]
FETCH[Increase Fetch Size<br>fetch.min.bytes=10KB]
POLL[Optimize Poll<br>max.poll.records=1000]
PARALLEL[Parallel Processing<br>Multiple Consumers]
COMMIT[Batch Commits<br>Manual Offset Management]
PREFETCH[Prefetch Tuning<br>fetch.max.wait.ms=500]
end
subgraph Metrics["Monitor These"]
M1[Records/Second]
M2[Lag per Partition]
M3[Commit Latency]
M4[Processing Time]
M5[Rebalance Frequency]
end
Tuning --> Metrics
style Tuning fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style Metrics fill:#e8f5e9,stroke:#43a047,stroke-width:1px
Common Patterns and Solutions
Pattern: Parallel Processing within Consumer
public class ParallelProcessingConsumer {
private final ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public void consumeWithParallelProcessing(Consumer<String, Event> consumer) {
while (true) {
ConsumerRecords<String, Event> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (ConsumerRecord<String, Event> record : records) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
processRecord(record);
}, executorService);
futures.add(future);
}
// Wait for all processing to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// Commit after all records processed
consumer.commitSync();
}
}
}
}
Review Questions and Answers
How does KIP-848 improve consumer rebalancing?
KIP-848 eliminates “stop-the-world” rebalances by using incremental cooperative rebalancing, allowing consumers to continue processing during rebalance operations.
When should you use manual vs automatic offset commits?
Use manual commits when you need precise control over when offsets are committed, especially for exactly-once processing or when implementing custom error handling.
How do you handle poison messages?
Implement a retry mechanism with exponential backoff, and after max retries, send the message to a dead letter queue for manual inspection.
What’s the optimal number of consumers in a group?
Ideally, have as many consumers as partitions for maximum parallelism. Having more consumers than partitions results in idle consumers.
How do you monitor consumer health?
Track consumer lag, processing time, error rates, and rebalance frequency. Set alerts for high lag or frequent rebalances.
Summary
Modern Kafka consumers in 2025 emphasize:
- Smooth operations with KIP-848 rebalancing protocol
- Reliability through error handling and dead letter queues
- Observability with comprehensive monitoring
- Cloud-native patterns for container deployments
- Exactly-once processing for critical applications
The combination of these patterns ensures your consumers can handle millions of messages reliably!
Related Content
- What is Kafka?
- Kafka Architecture
- Kafka Consumer Architecture
- Kafka Producer Tutorial
- Kafka Command Line
- Kafka Failover Tutorial
- Schema Registry
- Kafka Streams
About Cloudurable
We hope you enjoyed this tutorial. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.
TweetApache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting