January 9, 2025
🚀 What’s New in This 2025 Update
Major Updates and Changes
- Broker-Coordinated Rebalancing - No more client-side leader election
- Incremental Rebalancing - Minimal disruption during scaling
- End-to-End Exactly-Once - Production-ready transaction support
- Advanced Threading Models - Async and reactive patterns
- Smart Offset Management - External storage and recovery options
- AI Integration Ready - Custom deserializers for ML pipelines
Consumer Evolution Since 2017
- ✅ Smoother Rebalancing - Incremental protocol reduces downtime
- ✅ Better Semantics - Mature exactly-once delivery
- ✅ Enhanced Resilience - Built-in retry and DLQ patterns
- ✅ Cloud-Native - Auto-scaling and managed services
Welcome to the comprehensive guide on advanced Kafka Consumer patterns for 2025! This tutorial covers everything from delivery semantics to production-ready threading models.
Prerequisites
Before you start:
Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
Understanding Delivery Semantics
flowchart TB
subgraph AtLeastOnce["At-Least-Once"]
A1[Process Message]
A2[Commit Offset]
A1 --> A2
A2 -->|Failure After Process| A3[Duplicate on Retry]
end
subgraph AtMostOnce["At-Most-Once"]
B1[Commit Offset]
B2[Process Message]
B1 --> B2
B2 -->|Failure After Commit| B3[Message Lost]
end
subgraph ExactlyOnce["Exactly-Once"]
C1[Begin Transaction]
C2[Process Message]
C3[Produce Results]
C4[Commit Transaction]
C1 --> C2
C2 --> C3
C3 --> C4
C4 -->|Atomic| C5[No Duplicates/Loss]
end
style AtLeastOnce fill:#fff3e0,stroke:#ff6f00,stroke-width:2px
style AtMostOnce fill:#ffebee,stroke:#e53935,stroke-width:2px
style ExactlyOnce fill:#e8f5e9,stroke:#43a047,stroke-width:2px
At-Least-Once Delivery Consumer
package com.cloudurable.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
public class AtLeastOnceConsumer {
private static final Logger logger = LoggerFactory.getLogger(AtLeastOnceConsumer.class);
private final Consumer<String, StockPrice> consumer;
private final StockPriceProcessor processor;
public AtLeastOnceConsumer() {
this.consumer = createConsumer();
this.processor = new StockPriceProcessor();
}
private Consumer<String, StockPrice> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "at-least-once-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
// At-least-once configuration
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
return new KafkaConsumer<>(props);
}
public void consume() {
consumer.subscribe(Arrays.asList("stock-prices"));
try {
while (true) {
ConsumerRecords<String, StockPrice> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, StockPrice> record : records) {
try {
// Process the record - this might fail
processor.process(record.value());
// Commit offset only after successful processing
Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
consumer.commitSync(offsets);
} catch (Exception e) {
logger.error("Error processing record, will retry on next poll", e);
// Don't commit offset - message will be redelivered
break; // Exit the loop to re-poll
}
}
}
} finally {
consumer.close();
}
}
}
At-Most-Once Delivery Consumer
package com.cloudurable.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
public class AtMostOnceConsumer {
private static final Logger logger = LoggerFactory.getLogger(AtMostOnceConsumer.class);
private final Consumer<String, StockPrice> consumer;
private final StockPriceProcessor processor;
public AtMostOnceConsumer() {
this.consumer = createConsumer();
this.processor = new StockPriceProcessor();
}
private Consumer<String, StockPrice> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "at-most-once-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
// At-most-once configuration - auto-commit before processing
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new KafkaConsumer<>(props);
}
public void consume() {
consumer.subscribe(Arrays.asList("stock-prices"));
try {
while (true) {
ConsumerRecords<String, StockPrice> records = consumer.poll(Duration.ofMillis(1000));
// Offsets are committed automatically before processing
for (ConsumerRecord<String, StockPrice> record : records) {
try {
// Process record - if this fails, message is lost
processor.process(record.value());
} catch (Exception e) {
logger.error("Failed to process record, message lost", e);
// Continue with next record
}
}
}
} finally {
consumer.close();
}
}
}
Exactly-Once Delivery Consumer
package com.cloudurable.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
public class ExactlyOnceConsumer {
private static final Logger logger = LoggerFactory.getLogger(ExactlyOnceConsumer.class);
private final String consumerGroupId = "exactly-once-consumer";
private final Consumer<String, StockPrice> consumer;
private final Producer<String, ProcessedStockData> producer;
public ExactlyOnceConsumer() {
this.consumer = createConsumer();
this.producer = createTransactionalProducer();
}
private Consumer<String, StockPrice> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
// Exactly-once configuration
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
private Producer<String, ProcessedStockData> createTransactionalProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProcessedDataSerializer.class);
// Transactional configuration
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "stock-processor-tx");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all");
Producer<String, ProcessedStockData> producer = new KafkaProducer<>(props);
producer.initTransactions();
return producer;
}
public void consume() {
consumer.subscribe(Arrays.asList("stock-prices"));
try {
while (true) {
ConsumerRecords<String, StockPrice> records = consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty()) {
try {
// Begin transaction
producer.beginTransaction();
for (ConsumerRecord<String, StockPrice> record : records) {
// Process and produce in same transaction
ProcessedStockData processed = processStockPrice(record.value());
producer.send(new ProducerRecord<>(
"processed-stocks",
record.key(),
processed
));
}
// Send consumer offsets to transaction
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, StockPrice>> partitionRecords =
records.records(partition);
long lastOffset = partitionRecords
.get(partitionRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
// Commit offsets and transaction atomically
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
producer.commitTransaction();
} catch (KafkaException e) {
logger.error("Transaction failed, aborting", e);
producer.abortTransaction();
// Reset consumer to last committed offset
consumer.seekToCommitted(consumer.assignment());
}
}
}
} finally {
consumer.close();
producer.close();
}
}
private ProcessedStockData processStockPrice(StockPrice price) {
// Business logic here
return new ProcessedStockData(price);
}
}
Advanced ConsumerRebalanceListener
package com.cloudurable.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class StatefulRebalanceListener implements ConsumerRebalanceListener {
private static final Logger logger = LoggerFactory.getLogger(StatefulRebalanceListener.class);
private final Consumer<String, StockPrice> consumer;
private final Map<TopicPartition, PartitionState> partitionStates = new ConcurrentHashMap<>();
private final StateStore stateStore;
public StatefulRebalanceListener(Consumer<String, StockPrice> consumer, StateStore stateStore) {
this.consumer = consumer;
this.stateStore = stateStore;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.info("Partitions revoked: {}", partitions);
// Save state before losing partitions
for (TopicPartition partition : partitions) {
PartitionState state = partitionStates.get(partition);
if (state != null) {
try {
// Flush any pending work
state.flush();
// Save state to external store
stateStore.savePartitionState(partition, state);
// Clean up local state
partitionStates.remove(partition);
logger.info("Saved state for partition {}", partition);
} catch (Exception e) {
logger.error("Failed to save state for partition {}", partition, e);
}
}
}
// Commit current offsets
try {
consumer.commitSync();
} catch (Exception e) {
logger.error("Failed to commit offsets during rebalance", e);
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("Partitions assigned: {}", partitions);
for (TopicPartition partition : partitions) {
try {
// Restore state from external store
PartitionState state = stateStore.loadPartitionState(partition);
if (state != null) {
partitionStates.put(partition, state);
// Seek to last processed offset
consumer.seek(partition, state.getLastProcessedOffset() + 1);
logger.info("Restored state for partition {}, seeking to offset {}",
partition, state.getLastProcessedOffset() + 1);
} else {
// No saved state, start from committed offset
partitionStates.put(partition, new PartitionState(partition));
}
} catch (Exception e) {
logger.error("Failed to restore state for partition {}", partition, e);
}
}
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
logger.warn("Partitions lost: {}", partitions);
// Clean up without saving (partitions were lost unexpectedly)
for (TopicPartition partition : partitions) {
partitionStates.remove(partition);
}
}
}
class PartitionState {
private final TopicPartition partition;
private long lastProcessedOffset = -1;
private final Map<String, Double> stockAggregates = new HashMap<>();
public PartitionState(TopicPartition partition) {
this.partition = partition;
}
public void updateState(ConsumerRecord<String, StockPrice> record) {
lastProcessedOffset = record.offset();
// Aggregate logic
stockAggregates.merge(record.value().getSymbol(),
record.value().getPrice(),
(old, new_) -> (old + new_) / 2);
}
public void flush() {
// Flush any pending operations
}
public long getLastProcessedOffset() {
return lastProcessedOffset;
}
}
Manual Partition Assignment
package com.cloudurable.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
public class PriorityQueueConsumer {
private static final Logger logger = LoggerFactory.getLogger(PriorityQueueConsumer.class);
private final Consumer<String, StockPrice> consumer;
private final Set<TopicPartition> highPriorityPartitions;
private final Set<TopicPartition> lowPriorityPartitions;
public PriorityQueueConsumer() {
this.consumer = createConsumer();
// Manually assign partitions based on priority
List<PartitionInfo> partitions = consumer.partitionsFor("stock-prices");
// Assign even partitions as high priority, odd as low
highPriorityPartitions = partitions.stream()
.filter(p -> p.partition() % 2 == 0)
.map(p -> new TopicPartition(p.topic(), p.partition()))
.collect(Collectors.toSet());
lowPriorityPartitions = partitions.stream()
.filter(p -> p.partition() % 2 != 0)
.map(p -> new TopicPartition(p.topic(), p.partition()))
.collect(Collectors.toSet());
// Manually assign all partitions
Set<TopicPartition> allPartitions = new HashSet<>();
allPartitions.addAll(highPriorityPartitions);
allPartitions.addAll(lowPriorityPartitions);
consumer.assign(allPartitions);
logger.info("Assigned {} high priority and {} low priority partitions",
highPriorityPartitions.size(), lowPriorityPartitions.size());
}
private Consumer<String, StockPrice> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "priority-consumer-" + UUID.randomUUID());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new KafkaConsumer<>(props);
}
public void consume() {
try {
while (true) {
// Process high priority partitions more frequently
for (int i = 0; i < 3; i++) {
processPartitions(highPriorityPartitions, "HIGH", Duration.ofMillis(100));
}
// Process low priority partitions less frequently
processPartitions(lowPriorityPartitions, "LOW", Duration.ofMillis(50));
}
} finally {
consumer.close();
}
}
private void processPartitions(Set<TopicPartition> partitions, String priority, Duration timeout) {
// Pause all partitions
consumer.pause(consumer.assignment());
// Resume only the partitions we want to process
consumer.resume(partitions);
ConsumerRecords<String, StockPrice> records = consumer.poll(timeout);
for (ConsumerRecord<String, StockPrice> record : records) {
logger.info("[{}] Processing: {} - ${}",
priority,
record.value().getSymbol(),
record.value().getPrice());
// Process record based on priority
if ("HIGH".equals(priority)) {
processHighPriorityRecord(record);
} else {
processLowPriorityRecord(record);
}
}
// Commit offsets for processed partitions
if (!records.isEmpty()) {
consumer.commitSync();
}
}
private void processHighPriorityRecord(ConsumerRecord<String, StockPrice> record) {
// Fast processing for high priority
}
private void processLowPriorityRecord(ConsumerRecord<String, StockPrice> record) {
// Regular processing for low priority
}
}
Advanced Custom Deserializers
package com.cloudurable.kafka.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.zip.GZIPInputStream;
public class SecureStockPriceDeserializer implements Deserializer<StockPrice> {
private static final Logger logger = LoggerFactory.getLogger(SecureStockPriceDeserializer.class);
private ObjectMapper objectMapper;
private CircuitBreaker circuitBreaker;
private SecretKeySpec secretKey;
private boolean isCompressionEnabled;
private boolean isEncryptionEnabled;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.objectMapper = new ObjectMapper();
// Configure resilience
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofSeconds(30))
.slidingWindowSize(10)
.build();
this.circuitBreaker = CircuitBreaker.of("deserializer", config);
// Configure security
String encryptionKey = (String) configs.get("encryption.key");
if (encryptionKey != null) {
this.secretKey = new SecretKeySpec(encryptionKey.getBytes(), "AES");
this.isEncryptionEnabled = true;
}
this.isCompressionEnabled = Boolean.parseBoolean(
(String) configs.getOrDefault("compression.enabled", "false")
);
}
@Override
public StockPrice deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
return circuitBreaker.executeSupplier(() -> {
try {
byte[] processedData = data;
// Decrypt if enabled
if (isEncryptionEnabled) {
processedData = decrypt(processedData);
}
// Decompress if enabled
if (isCompressionEnabled) {
processedData = decompress(processedData);
}
// Check magic bytes for format detection
if (processedData.length > 4) {
ByteBuffer buffer = ByteBuffer.wrap(processedData, 0, 4);
int magic = buffer.getInt();
if (magic == 0x12345678) {
// Custom binary format
return deserializeBinary(processedData);
}
}
// Default to JSON
return objectMapper.readValue(processedData, StockPrice.class);
} catch (Exception e) {
logger.error("Failed to deserialize from topic {}", topic, e);
throw new SerializationException("Deserialization failed", e);
}
});
}
private byte[] decrypt(byte[] encrypted) throws Exception {
Cipher cipher = Cipher.getInstance("AES");
cipher.init(Cipher.DECRYPT_MODE, secretKey);
return cipher.doFinal(encrypted);
}
private byte[] decompress(byte[] compressed) throws Exception {
try (GZIPInputStream gis = new GZIPInputStream(
new java.io.ByteArrayInputStream(compressed))) {
return gis.readAllBytes();
}
}
private StockPrice deserializeBinary(byte[] data) {
ByteBuffer buffer = ByteBuffer.wrap(data);
buffer.getInt(); // Skip magic bytes
String symbol = readString(buffer);
double price = buffer.getDouble();
long volume = buffer.getLong();
long timestamp = buffer.getLong();
return new StockPrice(symbol, price, volume, timestamp);
}
private String readString(ByteBuffer buffer) {
int length = buffer.getInt();
byte[] bytes = new byte[length];
buffer.get(bytes);
return new String(bytes);
}
@Override
public void close() {
// Cleanup resources
}
}
Consumer Threading Models
Thread-Per-Consumer Model
package com.cloudurable.kafka.consumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class ThreadPerConsumerModel {
private static final Logger logger = LoggerFactory.getLogger(ThreadPerConsumerModel.class);
private final int numConsumers;
private final ExecutorService executor;
private final List<ConsumerWorker> workers;
public ThreadPerConsumerModel(int numConsumers) {
this.numConsumers = numConsumers;
this.executor = Executors.newFixedThreadPool(numConsumers);
// Create one consumer per thread
this.workers = IntStream.range(0, numConsumers)
.mapToObj(i -> new ConsumerWorker(i))
.toList();
}
public void start() {
workers.forEach(executor::submit);
logger.info("Started {} consumer threads", numConsumers);
}
public void shutdown() {
workers.forEach(ConsumerWorker::stop);
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
private static class ConsumerWorker implements Runnable {
private final int id;
private final Consumer<String, StockPrice> consumer;
private volatile boolean running = true;
public ConsumerWorker(int id) {
this.id = id;
this.consumer = createConsumer("consumer-thread-" + id);
}
@Override
public void run() {
try {
consumer.subscribe(Arrays.asList("stock-prices"));
while (running) {
ConsumerRecords<String, StockPrice> records =
consumer.poll(Duration.ofMillis(1000));
for (var record : records) {
logger.info("Thread {} processing: {} - ${}",
id, record.value().getSymbol(), record.value().getPrice());
// Process record
processRecord(record);
}
consumer.commitAsync();
}
} catch (Exception e) {
logger.error("Error in consumer thread {}", id, e);
} finally {
consumer.close();
logger.info("Consumer thread {} stopped", id);
}
}
public void stop() {
running = false;
consumer.wakeup();
}
private void processRecord(var record) {
// Business logic here
}
private static Consumer<String, StockPrice> createConsumer(String clientId) {
// Create consumer with unique client ID
Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
// ... other properties
return new KafkaConsumer<>(props);
}
}
}
Multi-Threaded Consumer Model
package com.cloudurable.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class MultiThreadedConsumerModel {
private static final Logger logger = LoggerFactory.getLogger(MultiThreadedConsumerModel.class);
private final Consumer<String, StockPrice> consumer;
private final ExecutorService processingPool;
private final BlockingQueue<ConsumerRecord<String, StockPrice>> recordQueue;
private final Map<TopicPartition, Long> pendingOffsets;
private final AtomicBoolean running = new AtomicBoolean(true);
public MultiThreadedConsumerModel(int processingThreads) {
this.consumer = createConsumer();
this.processingPool = Executors.newFixedThreadPool(processingThreads);
this.recordQueue = new LinkedBlockingQueue<>(10000);
this.pendingOffsets = new ConcurrentHashMap<>();
// Start processing threads
for (int i = 0; i < processingThreads; i++) {
processingPool.submit(new RecordProcessor(i));
}
}
private Consumer<String, StockPrice> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "multi-threaded-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
return new KafkaConsumer<>(props);
}
public void start() {
consumer.subscribe(Arrays.asList("stock-prices"));
// Consumer thread - only polls and queues records
Thread consumerThread = new Thread(() -> {
try {
while (running.get()) {
ConsumerRecords<String, StockPrice> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, StockPrice> record : records) {
// Queue record for processing
while (!recordQueue.offer(record, 100, TimeUnit.MILLISECONDS)) {
if (!running.get()) break;
}
// Track pending offset
pendingOffsets.merge(
new TopicPartition(record.topic(), record.partition()),
record.offset(),
Math::max
);
}
// Commit completed offsets
commitCompletedOffsets();
}
} catch (Exception e) {
logger.error("Error in consumer thread", e);
} finally {
consumer.close();
}
});
consumerThread.start();
}
private void commitCompletedOffsets() {
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
// This is simplified - real implementation needs to track completion
pendingOffsets.forEach((partition, offset) -> {
toCommit.put(partition, new OffsetAndMetadata(offset + 1));
});
if (!toCommit.isEmpty()) {
consumer.commitAsync(toCommit, (offsets, exception) -> {
if (exception != null) {
logger.error("Failed to commit offsets", exception);
}
});
}
}
private class RecordProcessor implements Runnable {
private final int id;
public RecordProcessor(int id) {
this.id = id;
}
@Override
public void run() {
while (running.get()) {
try {
ConsumerRecord<String, StockPrice> record =
recordQueue.poll(100, TimeUnit.MILLISECONDS);
if (record != null) {
logger.info("Processor {} handling: {} - ${}",
id, record.value().getSymbol(), record.value().getPrice());
// Process record
processRecord(record);
// Mark as completed (simplified)
// Real implementation needs proper offset tracking
}
} catch (Exception e) {
logger.error("Error in processor {}", id, e);
}
}
}
private void processRecord(ConsumerRecord<String, StockPrice> record) {
// Business logic here
}
}
public void shutdown() {
running.set(false);
consumer.wakeup();
processingPool.shutdown();
try {
if (!processingPool.awaitTermination(30, TimeUnit.SECONDS)) {
processingPool.shutdownNow();
}
} catch (InterruptedException e) {
processingPool.shutdownNow();
}
}
}
Reactive Consumer Model
package com.cloudurable.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class ReactiveConsumerModel {
private static final Logger logger = LoggerFactory.getLogger(ReactiveConsumerModel.class);
private final KafkaReceiver<String, StockPrice> receiver;
public ReactiveConsumerModel() {
this.receiver = createReceiver();
}
private KafkaReceiver<String, StockPrice> createReceiver() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ReceiverOptions<String, StockPrice> receiverOptions =
ReceiverOptions.<String, StockPrice>create(props)
.subscription(Collections.singleton("stock-prices"))
.commitBatchSize(10)
.commitInterval(Duration.ofSeconds(1));
return KafkaReceiver.create(receiverOptions);
}
public void consume() {
receiver.receive()
.groupBy(record -> record.partition())
.flatMap(partitionFlux -> partitionFlux
.publishOn(Schedulers.parallel())
.flatMap(this::processRecord)
.sample(Duration.ofSeconds(5))
.doOnNext(record -> record.receiverOffset().commit())
)
.subscribe();
}
private Mono<ReceiverRecord<String, StockPrice>> processRecord(
ReceiverRecord<String, StockPrice> record) {
return Mono.fromCallable(() -> {
StockPrice stockPrice = record.value();
logger.info("Processing: {} - ${}", stockPrice.getSymbol(), stockPrice.getPrice());
// Simulate async processing
if (stockPrice.getPrice() > 1000) {
return processHighValueStock(record);
} else {
return processNormalStock(record);
}
})
.subscribeOn(Schedulers.boundedElastic())
.doOnError(error -> logger.error("Error processing record", error))
.retry(3)
.onErrorReturn(record);
}
private ReceiverRecord<String, StockPrice> processHighValueStock(
ReceiverRecord<String, StockPrice> record) {
// Special processing for high-value stocks
return record;
}
private ReceiverRecord<String, StockPrice> processNormalStock(
ReceiverRecord<String, StockPrice> record) {
// Normal processing
return record;
}
}
Advanced Offset Management
package com.cloudurable.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class ExternalOffsetManager {
private static final Logger logger = LoggerFactory.getLogger(ExternalOffsetManager.class);
private final Consumer<String, StockPrice> consumer;
private final OffsetStore offsetStore;
private final Map<TopicPartition, Long> processingOffsets;
public ExternalOffsetManager(OffsetStore offsetStore) {
this.consumer = createConsumer();
this.offsetStore = offsetStore;
this.processingOffsets = new ConcurrentHashMap<>();
}
private Consumer<String, StockPrice> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "external-offset-consumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StockPriceDeserializer.class);
return new KafkaConsumer<>(props);
}
public void consume() {
// Subscribe with custom rebalance listener
consumer.subscribe(Arrays.asList("stock-prices"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Save current processing state
saveOffsetsToExternalStore();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Restore from external store
for (TopicPartition partition : partitions) {
Long offset = offsetStore.getOffset(partition);
if (offset != null) {
consumer.seek(partition, offset);
logger.info("Restored offset {} for partition {}", offset, partition);
}
}
}
});
try {
while (true) {
ConsumerRecords<String, StockPrice> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, StockPrice> record : records) {
try {
// Process record
processRecord(record);
// Track successful processing
processingOffsets.put(
new TopicPartition(record.topic(), record.partition()),
record.offset() + 1
);
} catch (Exception e) {
logger.error("Failed to process record", e);
// Implement retry logic or DLQ here
}
}
// Periodically save to external store
if (!records.isEmpty() && System.currentTimeMillis() % 5000 < 1000) {
saveOffsetsToExternalStore();
}
}
} finally {
saveOffsetsToExternalStore();
consumer.close();
}
}
private void saveOffsetsToExternalStore() {
processingOffsets.forEach((partition, offset) -> {
try {
offsetStore.saveOffset(partition, offset);
logger.debug("Saved offset {} for partition {}", offset, partition);
} catch (Exception e) {
logger.error("Failed to save offset for partition {}", partition, e);
}
});
}
private void processRecord(ConsumerRecord<String, StockPrice> record) {
// Business logic
}
}
interface OffsetStore {
void saveOffset(TopicPartition partition, long offset);
Long getOffset(TopicPartition partition);
}
// Example implementation using Redis
class RedisOffsetStore implements OffsetStore {
private final RedisClient redisClient;
public RedisOffsetStore(RedisClient redisClient) {
this.redisClient = redisClient;
}
@Override
public void saveOffset(TopicPartition partition, long offset) {
String key = String.format("kafka:offset:%s:%d", partition.topic(), partition.partition());
redisClient.set(key, String.valueOf(offset));
}
@Override
public Long getOffset(TopicPartition partition) {
String key = String.format("kafka:offset:%s:%d", partition.topic(), partition.partition());
String value = redisClient.get(key);
return value != null ? Long.parseLong(value) : null;
}
}
Best Practices Summary
flowchart TB
subgraph BestPractices["Consumer Best Practices"]
subgraph Reliability["Reliability"]
R1[Use Manual Commits]
R2[Implement Idempotence]
R3[Handle Rebalances]
R4[Monitor Consumer Lag]
end
subgraph Performance["Performance"]
P1[Tune Poll Settings]
P2[Use Appropriate Threading]
P3[Batch Processing]
P4[Optimize Deserialization]
end
subgraph Resilience["Resilience"]
RS1[Implement DLQ]
RS2[Circuit Breakers]
RS3[Retry Logic]
RS4[Health Checks]
end
end
style Reliability fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style Performance fill:#e8f5e9,stroke:#43a047,stroke-width:2px
style Resilience fill:#fff3e0,stroke:#ff6f00,stroke-width:2px
Review Questions
When should you use exactly-once semantics?
Use exactly-once when data accuracy is critical and duplicates would cause issues, such as financial transactions or inventory management.
What’s the benefit of ConsumerRebalanceListener?
It allows you to save state before losing partitions and restore state when gaining partitions, ensuring smooth rebalancing.
When to use manual partition assignment?
Use manual assignment for specialized use cases like priority queues, deterministic ordering, or workload isolation.
Which threading model is best?
Thread-per-consumer is simplest, multi-threaded offers better resource utilization, and reactive provides best scalability for I/O-bound operations.
How to handle poison messages?
Implement try-catch blocks, use circuit breakers, send failures to DLQ, and monitor/alert on repeated failures.
Summary
Advanced Kafka consumers in 2025 offer:
- Flexible Delivery Semantics for different consistency requirements
- Smooth Rebalancing with minimal disruption
- Multiple Threading Models for various use cases
- Robust Error Handling with DLQ and circuit breakers
- Production-Ready Patterns for reliability at scale
Master these patterns to build resilient, high-performance streaming applications!
Related Content
- What is Kafka?
- Kafka Architecture
- Kafka Consumer Architecture
- Basic Kafka Consumer
- Advanced Kafka Consumer Part 1
- Schema Registry
About Cloudurable
We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.
TweetApache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting