Kafka Tutorial: Kafka Clusters, Consumer Failover, and Broker Failover - 2025 Edition

January 9, 2025

                                                                           

🚀 What’s New in This 2025 Update

Major Updates and Changes

  • KRaft Mode - No ZooKeeper! Native consensus with Raft
  • Faster Failover - Sub-second leader election
  • Improved Rebalancing - KIP-848 protocol for smooth transitions
  • Enhanced Monitoring - Built-in health checks and metrics
  • Cloud-Native - Kubernetes operators and auto-scaling
  • Multi-Region - Cross-datacenter replication patterns

High Availability Evolution Since 2017

  • ✅ No External Dependencies - KRaft eliminates ZooKeeper
  • ✅ Faster Recovery - Millisecond failover times
  • ✅ Better Observability - Detailed failover metrics
  • ✅ Automated Operations - Self-healing clusters

Ready to build bulletproof Kafka clusters? Let’s explore failover, high availability, and fault tolerance with hands-on examples!

Getting Started with Kafka Cluster Tutorial

Understanding Kafka Failover

flowchart TB
  subgraph Before["Traditional Kafka HA"]
    ZK[ZooKeeper Ensemble<br>3-5 nodes]
    K1[Kafka Broker 1]
    K2[Kafka Broker 2]
    K3[Kafka Broker 3]
    
    K1 <--> ZK
    K2 <--> ZK
    K3 <--> ZK
  end
  
  subgraph After["Kafka 4.0 with KRaft"]
    C1[Controller/Broker 1]
    C2[Controller/Broker 2]
    C3[Controller/Broker 3]
    
    C1 <--> C2
    C2 <--> C3
    C3 <--> C1
  end
  
  Before -->|Simplified| After
  
  style Before fill:#ffebee,stroke:#e53935,stroke-width:2px
  style After fill:#e8f5e9,stroke:#43a047,stroke-width:2px

In this tutorial, we’ll:

  • Run a 3-node Kafka cluster with KRaft mode
  • Create replicated topics for fault tolerance
  • Demonstrate consumer failover when consumers crash
  • Show broker failover when Kafka nodes fail
  • Explore partition rebalancing and leader election

Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Setting Up a 3-Node Kafka Cluster

Create KRaft Configuration Files

First, let’s create configuration files for our three Kafka nodes:

~/kafka-training/lab2/config/kraft/server-1.properties

# Node ID - unique for each broker
node.id=1
process.roles=controller,broker

# Listeners
listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:19092
advertised.listeners=PLAINTEXT://localhost:9092
controller.listener.names=CONTROLLER
inter.broker.listener.name=PLAINTEXT

# Quorum Voters - all three controllers
controller.quorum.voters=1@localhost:19092,2@localhost:19093,3@localhost:19094

# Logs
log.dirs=./logs/kafka-1
metadata.log.dir=./logs/kafka-1/metadata

# Replication
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

# Performance
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

~/kafka-training/lab2/config/kraft/server-2.properties

node.id=2
process.roles=controller,broker

listeners=PLAINTEXT://localhost:9093,CONTROLLER://localhost:19093
advertised.listeners=PLAINTEXT://localhost:9093
controller.listener.names=CONTROLLER
inter.broker.listener.name=PLAINTEXT

controller.quorum.voters=1@localhost:19092,2@localhost:19093,3@localhost:19094

log.dirs=./logs/kafka-2
metadata.log.dir=./logs/kafka-2/metadata

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

~/kafka-training/lab2/config/kraft/server-3.properties

node.id=3
process.roles=controller,broker

listeners=PLAINTEXT://localhost:9094,CONTROLLER://localhost:19094
advertised.listeners=PLAINTEXT://localhost:9094
controller.listener.names=CONTROLLER
inter.broker.listener.name=PLAINTEXT

controller.quorum.voters=1@localhost:19092,2@localhost:19093,3@localhost:19094

log.dirs=./logs/kafka-3
metadata.log.dir=./logs/kafka-3/metadata

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

Initialize and Start the Cluster

~/kafka-training/lab2/setup-cluster.sh

#!/usr/bin/env bash
cd ~/kafka-training

# Generate cluster ID
KAFKA_CLUSTER_ID="$(kafka/bin/kafka-storage.sh random-uuid)"
echo "Cluster ID: $KAFKA_CLUSTER_ID"

# Format storage for each node
for i in 1 2 3; do
  echo "Formatting storage for node $i"
  kafka/bin/kafka-storage.sh format \
    -t $KAFKA_CLUSTER_ID \
    -c lab2/config/kraft/server-$i.properties
done

echo "Cluster initialized with ID: $KAFKA_CLUSTER_ID"

~/kafka-training/lab2/start-node-1.sh

#!/usr/bin/env bash
cd ~/kafka-training

kafka/bin/kafka-server-start.sh \
    lab2/config/kraft/server-1.properties

~/kafka-training/lab2/start-node-2.sh

#!/usr/bin/env bash
cd ~/kafka-training

kafka/bin/kafka-server-start.sh \
    lab2/config/kraft/server-2.properties

~/kafka-training/lab2/start-node-3.sh

#!/usr/bin/env bash
cd ~/kafka-training

kafka/bin/kafka-server-start.sh \
    lab2/config/kraft/server-3.properties

Start the Cluster

# Make scripts executable
chmod +x setup-cluster.sh start-node-*.sh

# Initialize the cluster
./setup-cluster.sh

# Start each node in a separate terminal
./start-node-1.sh
./start-node-2.sh
./start-node-3.sh

Create a Replicated Topic

Topic Creation Script

~/kafka-training/lab2/create-replicated-topic.sh

#!/usr/bin/env bash
cd ~/kafka-training

# Create topic with replication factor 3
kafka/bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
    --replication-factor 3 \
    --partitions 13 \
    --topic my-failsafe-topic \
    --config min.insync.replicas=2 \
    --config unclean.leader.election.enable=false

# Verify topic creation
kafka/bin/kafka-topics.sh --describe \
    --bootstrap-server localhost:9092 \
    --topic my-failsafe-topic

Run the script:

./create-replicated-topic.sh

Understanding Topic Layout

flowchart TB
  subgraph Topic["my-failsafe-topic (13 partitions)"]
    subgraph P0["Partition 0"]
      L0[Leader: Broker 2]
      F0[Followers: 0,1]
    end
    
    subgraph P1["Partition 1"]
      L1[Leader: Broker 0]
      F1[Followers: 1,2]
    end
    
    subgraph P2["Partition 2"]
      L2[Leader: Broker 1]
      F2[Followers: 2,0]
    end
    
    subgraph More["...10 more partitions"]
    end
  end
  
  style Topic fill:#e3f2fd,stroke:#1976d2,stroke-width:2px

Consumer Group Failover Demo

Start Multiple Consumers in Same Group

~/kafka-training/lab2/start-consumer-group.sh

#!/usr/bin/env bash
cd ~/kafka-training

# Consumer with new KIP-848 protocol
kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
    --topic my-failsafe-topic \
    --group failover-demo-group \
    --property print.key=true \
    --property print.partition=true \
    --property print.offset=true \
    --consumer-property group.protocol=consumer \
    --from-beginning

Start 3 consumers in separate terminals:

# Terminal 1
./start-consumer-group.sh

# Terminal 2
./start-consumer-group.sh

# Terminal 3
./start-consumer-group.sh

Start Producer

~/kafka-training/lab2/start-producer.sh

#!/usr/bin/env bash
cd ~/kafka-training

kafka/bin/kafka-console-producer.sh \
    --bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
    --topic my-failsafe-topic \
    --property parse.key=true \
    --property key.separator=:

Demonstrate Consumer Load Balancing

Send messages from producer:

user1:message1
user2:message2
user3:message3
user4:message4
user5:message5
user6:message6
user7:message7

Observe how messages are distributed across the three consumers:

Consumer 1 might get:

Partition:3 Offset:0 user3 message3
Partition:6 Offset:0 user6 message6

Consumer 2 might get:

Partition:1 Offset:0 user1 message1
Partition:4 Offset:0 user4 message4
Partition:7 Offset:0 user7 message7

Consumer 3 might get:

Partition:2 Offset:0 user2 message2
Partition:5 Offset:0 user5 message5

Test Consumer Failover

  1. Kill Consumer 3 (Ctrl+C)
  2. Send more messages:
user8:message8
user9:message9
user10:message10
user11:message11
  1. Observe redistribution - The remaining two consumers will take over the partitions from the failed consumer.

Broker Failover Demo

Monitor Topic Status

~/kafka-training/lab2/monitor-topic.sh

#!/usr/bin/env bash
cd ~/kafka-training

while true; do
  clear
  echo "=== Topic Status at $(date) ==="
  kafka/bin/kafka-topics.sh --describe \
      --bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
      --topic my-failsafe-topic | grep -E "Partition:|Leader:|Replicas:|Isr:"
  
  echo -e "\n=== Cluster Metadata ==="
  kafka/bin/kafka-metadata.sh --snapshot \
      --print-brokers \
      lab2/logs/kafka-1/metadata/__cluster_metadata-0/00000000000000000000.log 2>/dev/null || echo "Node 1 metadata unavailable"
  
  sleep 5
done

Kill a Broker

Run the monitor in one terminal:

./monitor-topic.sh

Kill broker 1:

# Find and kill the process
kill $(ps aux | grep 'server-1.properties' | grep -v grep | awk '{print $2}')

Observe Failover

Watch the monitor output to see:

  1. ISR shrinks - Failed broker removed from In-Sync Replicas
  2. Leader election - New leaders elected for affected partitions
  3. Rebalancing - Partitions redistributed

Example output after killing broker 1:

=== Topic Status ===
Topic: my-failsafe-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,1
Topic: my-failsafe-topic    Partition: 1    Leader: 2    Replicas: 0,1,2    Isr: 2,1
Topic: my-failsafe-topic    Partition: 2    Leader: 3    Replicas: 1,2,0    Isr: 3,2

Verify Message Delivery Continues

Send more messages:

user12:message12
user13:message13

Messages are still delivered to consumers despite broker failure!

Advanced High Availability Patterns

Multi-Region Replication

flowchart LR
  subgraph DC1["Data Center 1"]
    K1[Kafka Cluster 1<br>Primary]
  end
  
  subgraph DC2["Data Center 2"]
    K2[Kafka Cluster 2<br>Secondary]
  end
  
  subgraph DC3["Data Center 3"]
    K3[Kafka Cluster 3<br>Secondary]
  end
  
  K1 -->|MirrorMaker 2.0| K2
  K1 -->|MirrorMaker 2.0| K3
  
  subgraph Producers["Producers"]
    P1[App 1]
    P2[App 2]
  end
  
  subgraph Consumers["Consumers"]
    C1[Analytics]
    C2[Monitoring]
  end
  
  P1 --> K1
  P2 --> K1
  
  K2 --> C1
  K3 --> C2
  
  style DC1 fill:#e8f5e9,stroke:#43a047,stroke-width:2px
  style DC2 fill:#e3f2fd,stroke:#1976d2,stroke-width:1px
  style DC3 fill:#fff9c4,stroke:#f9a825,stroke-width:1px

Rack-Aware Replication

# server-1.properties
broker.rack=rack1

# server-2.properties
broker.rack=rack2

# server-3.properties
broker.rack=rack3

This ensures replicas are distributed across failure domains.

Production Health Checks

~/kafka-training/lab2/health-check.sh

#!/usr/bin/env bash

check_broker_health() {
  local port=$1
  local broker_id=$2
  
  if nc -zv localhost $port 2>/dev/null; then
    echo "✓ Broker $broker_id (port $port) is healthy"
  else
    echo "✗ Broker $broker_id (port $port) is DOWN"
  fi
}

check_topic_health() {
  local topic=$1
  local min_isr=$(kafka/bin/kafka-configs.sh \
    --bootstrap-server localhost:9092 \
    --entity-type topics \
    --entity-name $topic \
    --describe | grep min.insync.replicas | cut -d= -f2)
  
  local under_replicated=$(kafka/bin/kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --describe --under-replicated-partitions | wc -l)
  
  if [ $under_replicated -eq 0 ]; then
    echo "✓ Topic $topic is fully replicated"
  else
    echo "✗ Topic $topic has $under_replicated under-replicated partitions"
  fi
}

echo "=== Kafka Cluster Health Check ==="
check_broker_health 9092 1
check_broker_health 9093 2
check_broker_health 9094 3

echo -e "\n=== Topic Health ==="
check_topic_health my-failsafe-topic

echo -e "\n=== Consumer Group Lag ==="
kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group failover-demo-group \
  --describe 2>/dev/null | grep -E "TOPIC|my-failsafe-topic"

Monitoring Failover Metrics

Key Metrics to Track

// Example: Monitoring with JMX
public class FailoverMonitor {
    private static final String[] CRITICAL_METRICS = {
        "kafka.controller:type=KafkaController,name=OfflinePartitionsCount",
        "kafka.controller:type=KafkaController,name=ActiveControllerCount",
        "kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions",
        "kafka.server:type=ReplicaManager,name=LeaderCount",
        "kafka.network:type=RequestMetrics,name=TotalTimeMs,request=LeaderAndIsr"
    };
    
    public void monitorFailover() {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        
        for (String metric : CRITICAL_METRICS) {
            try {
                ObjectName name = new ObjectName(metric);
                Object value = mbs.getAttribute(name, "Value");
                
                // Alert if critical thresholds exceeded
                if (metric.contains("OfflinePartitions") && (Long)value > 0) {
                    alert("CRITICAL: Offline partitions detected: " + value);
                }
                
                if (metric.contains("UnderReplicated") && (Long)value > 0) {
                    alert("WARNING: Under-replicated partitions: " + value);
                }
                
            } catch (Exception e) {
                logger.error("Error reading metric: " + metric, e);
            }
        }
    }
}

Best Practices for High Availability

1. Cluster Sizing

Production Cluster:
  Controllers: 3 or 5 (odd number for quorum)
  Brokers: 3+ (depends on load)
  Replication Factor: 3
  Min ISR: 2
  Racks/AZs: 3+

2. Topic Configuration

# High availability topic
kafka/bin/kafka-topics.sh --create \
  --topic critical-events \
  --partitions 50 \
  --replication-factor 3 \
  --config min.insync.replicas=2 \
  --config unclean.leader.election.enable=false \
  --config retention.ms=604800000

3. Producer Configuration

# Ensure durability
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5
enable.idempotence=true

4. Consumer Configuration

# Handle rebalances gracefully
session.timeout.ms=45000
heartbeat.interval.ms=3000
max.poll.interval.ms=300000
group.protocol=consumer  # KIP-848

Troubleshooting Failover Issues

Common Issues and Solutions

Issue Symptoms Solution
Slow failover Leader election > 30s Tune replica.lag.time.max.ms
Frequent rebalances Consumers constantly rejoining Increase session timeout
Under-replicated partitions ISR < replication factor Check broker health and network
Split brain Multiple active controllers Ensure proper quorum configuration
Message loss Missing messages after failover Set min.insync.replicas=2, acks=all

Debug Commands

# Check controller status
kafka/bin/kafka-metadata.sh --snapshot \
  --print-controllers logs/kafka-1/metadata/__cluster_metadata-0/

# View under-replicated partitions
kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --describe --under-replicated-partitions

# Check consumer group state
kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group failover-demo-group \
  --describe --state

Review Questions

Why did messages get redistributed when we killed a consumer?

The consumer group protocol automatically rebalances partitions among remaining consumers to ensure all partitions are consumed.

How fast is broker failover with KRaft?

KRaft enables sub-second failover, typically completing leader election in 100-500ms vs several seconds with ZooKeeper.

What happens to in-flight messages during broker failure?

With acks=all and min.insync.replicas=2, messages are only acknowledged after replication, preventing loss during failover.

How does rack awareness improve reliability?

Rack awareness ensures replicas are distributed across failure domains (racks/AZs), surviving infrastructure failures.

What’s the minimum cluster size for high availability?

3 nodes minimum for both quorum (2+1) and replication factor 3 with min.insync.replicas=2.

Summary

Kafka’s high availability features ensure:

  • Zero message loss with proper replication settings
  • Continuous availability during node failures
  • Automatic failover for both consumers and brokers
  • Fast recovery with KRaft consensus
  • Flexible deployment across regions and clouds

The combination of KRaft mode, improved rebalancing protocols, and battle-tested replication makes Kafka 4.0 the most reliable version yet!

About Cloudurable

We hope you enjoyed this tutorial. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.

                                                                           
comments powered by Disqus

Apache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting