January 9, 2025
🚀 What’s New in This 2025 Update
Major Updates and Changes
- KRaft Mode by Default - No ZooKeeper required
- Container-First Approach - Docker and Kubernetes native
- Dynamic Configuration - Minimal restarts needed
- Automated Management - Operators handle lifecycle
- Enhanced Monitoring - Built-in observability
- Cloud-Native Patterns - Auto-scaling and self-healing
Broker Startup Evolution Since 2017
- ✅ No ZooKeeper - KRaft provides native consensus
- ✅ Declarative Config - GitOps and Infrastructure as Code
- ✅ Container Ready - Optimized for Kubernetes
- ✅ Zero-Downtime - Rolling updates standard
Running a Kafka Broker in 2025
Starting Kafka brokers has evolved significantly with KRaft mode and cloud-native practices. This guide covers modern approaches from simple local development to production Kubernetes deployments.
Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
KRaft Mode Startup (No ZooKeeper!)
flowchart LR
subgraph KRaftCluster["Kafka Cluster with KRaft"]
subgraph Controllers["Controllers (Quorum)"]
C1[Controller 1<br>node.id=1]
C2[Controller 2<br>node.id=2]
C3[Controller 3<br>node.id=3]
end
subgraph Brokers["Brokers"]
B1[Broker 1<br>node.id=4]
B2[Broker 2<br>node.id=5]
B3[Broker 3<br>node.id=6]
end
end
C1 <--> C2
C2 <--> C3
C3 <--> C1
Controllers --> Brokers
style Controllers fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style Brokers fill:#e8f5e9,stroke:#43a047,stroke-width:1px
Basic KRaft Configuration
Create a KRaft configuration file:
config/kraft/server.properties
# Node identity - unique across cluster
node.id=1
# Roles: controller, broker, or both
process.roles=controller,broker
# Controller quorum voters
controller.quorum.voters=1@localhost:9093,2@localhost:9094,3@localhost:9095
# Listeners
listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9092
controller.listener.names=CONTROLLER
inter.broker.listener.name=PLAINTEXT
# Log directories
log.dirs=/var/kafka/logs
metadata.log.dir=/var/kafka/metadata
# Performance settings
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# Replication
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# Log retention
log.retention.hours=168
log.segment.bytes=1073741824
Initialize and Start KRaft Cluster
#!/bin/bash
# generate-cluster-id.sh
# Generate a unique cluster ID
KAFKA_CLUSTER_ID="$(kafka-storage.sh random-uuid)"
echo "Generated Cluster ID: $KAFKA_CLUSTER_ID"
# Store for later use
echo "$KAFKA_CLUSTER_ID" > /var/kafka/cluster.id
#!/bin/bash
# format-storage.sh
KAFKA_CLUSTER_ID=$(cat /var/kafka/cluster.id)
# Format storage for each node
kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /etc/kafka/server.properties
#!/bin/bash
# start-kafka.sh
# Start Kafka with KRaft
kafka-server-start.sh /etc/kafka/server.properties
Advanced Startup Script with Health Checks
#!/bin/bash
# advanced-kafka-start.sh
set -e
# Configuration
KAFKA_HOME="${KAFKA_HOME:-/opt/kafka}"
CONFIG_FILE="${1:-/etc/kafka/server.properties}"
LOG_DIR="${LOG_DIR:-/var/log/kafka}"
HEALTH_CHECK_RETRIES=30
HEALTH_CHECK_INTERVAL=2
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
log() {
echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')]${NC} $1"
}
error() {
echo -e "${RED}[ERROR]${NC} $1" >&2
}
warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
# Pre-flight checks
pre_flight_checks() {
log "Running pre-flight checks..."
# Check Java version
if ! java -version 2>&1 | grep -q "version \"17"; then
error "Java 17 required for Kafka 4.0+"
exit 1
fi
# Check config file
if [[ ! -f "$CONFIG_FILE" ]]; then
error "Config file not found: $CONFIG_FILE"
exit 1
fi
# Check if already running
if pgrep -f "kafka.Kafka" > /dev/null; then
warn "Kafka appears to be already running"
exit 0
fi
# Create directories
mkdir -p "$LOG_DIR"
log "Pre-flight checks passed"
}
# Extract configuration values
get_config_value() {
grep "^$1=" "$CONFIG_FILE" | cut -d'=' -f2
}
# Health check function
health_check() {
local node_id=$(get_config_value "node.id")
local listeners=$(get_config_value "listeners")
local port=$(echo "$listeners" | grep -oP 'PLAINTEXT://[^:]+:\K\d+' | head -1)
log "Checking health on port $port..."
for i in $(seq 1 $HEALTH_CHECK_RETRIES); do
if nc -zv localhost "$port" 2>/dev/null; then
log "Kafka broker is healthy"
return 0
fi
warn "Health check attempt $i/$HEALTH_CHECK_RETRIES failed"
sleep $HEALTH_CHECK_INTERVAL
done
error "Kafka failed to start after $HEALTH_CHECK_RETRIES attempts"
return 1
}
# Main startup
main() {
pre_flight_checks
# Set JMX options
export JMX_PORT="${JMX_PORT:-9999}"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Dcom.sun.management.jmxremote.port=$JMX_PORT"
# Set JVM options
export KAFKA_HEAP_OPTS="${KAFKA_HEAP_OPTS:--Xmx2G -Xms2G}"
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC \
-XX:MaxGCPauseMillis=20 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:+ExplicitGCInvokesConcurrent"
log "Starting Kafka with config: $CONFIG_FILE"
log "JMX Port: $JMX_PORT"
# Start Kafka in background
nohup "$KAFKA_HOME/bin/kafka-server-start.sh" "$CONFIG_FILE" \
> "$LOG_DIR/kafka-startup.log" 2>&1 &
local kafka_pid=$!
echo $kafka_pid > /var/run/kafka.pid
log "Kafka started with PID: $kafka_pid"
# Wait for startup and check health
if health_check; then
log "Kafka broker started successfully"
# Log broker metadata
local node_id=$(get_config_value "node.id")
local roles=$(get_config_value "process.roles")
log "Node ID: $node_id, Roles: $roles"
else
error "Kafka startup failed"
kill $kafka_pid 2>/dev/null
exit 1
fi
}
# Run main function
main "$@"
Containerized Kafka Deployment
Dockerfile for Kafka
FROM eclipse-temurin:17-jre-jammy
# Kafka version
ENV KAFKA_VERSION=3.6.0
ENV SCALA_VERSION=2.13
ENV KAFKA_HOME=/opt/kafka
# Install dependencies
RUN apt-get update && \
apt-get install -y curl netcat-openbsd && \
rm -rf /var/lib/apt/lists/*
# Download and install Kafka
RUN curl -L https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
| tar -xzC /opt && \
mv /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME}
# Create kafka user
RUN useradd -r -s /bin/bash kafka && \
chown -R kafka:kafka ${KAFKA_HOME}
# Volume for logs and data
VOLUME ["/var/kafka/logs", "/var/kafka/metadata"]
# Copy startup script
COPY --chown=kafka:kafka docker-entrypoint.sh /
RUN chmod +x /docker-entrypoint.sh
USER kafka
WORKDIR ${KAFKA_HOME}
EXPOSE 9092 9093 9999
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["kafka"]
Docker Entrypoint Script
#!/bin/bash
# docker-entrypoint.sh
set -e
# Generate node.id if not set
if [ -z "$KAFKA_NODE_ID" ]; then
KAFKA_NODE_ID="${HOSTNAME##*-}"
echo "Generated node.id: $KAFKA_NODE_ID"
fi
# Configure based on environment variables
configure_kafka() {
local config_file="/opt/kafka/config/kraft/server.properties"
# Create config from template
cat > "$config_file" << EOF
node.id=${KAFKA_NODE_ID}
process.roles=${KAFKA_PROCESS_ROLES:-controller,broker}
# Listeners
listeners=${KAFKA_LISTENERS:-PLAINTEXT://:9092,CONTROLLER://:9093}
advertised.listeners=${KAFKA_ADVERTISED_LISTENERS:-PLAINTEXT://localhost:9092}
controller.listener.names=CONTROLLER
inter.broker.listener.name=PLAINTEXT
# Controller configuration
controller.quorum.voters=${KAFKA_CONTROLLER_QUORUM_VOTERS}
# Storage
log.dirs=${KAFKA_LOG_DIRS:-/var/kafka/logs}
metadata.log.dir=${KAFKA_METADATA_LOG_DIR:-/var/kafka/metadata}
# Replication
offsets.topic.replication.factor=${KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:-3}
transaction.state.log.replication.factor=${KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:-3}
transaction.state.log.min.isr=${KAFKA_TRANSACTION_STATE_LOG_MIN_ISR:-2}
# Performance
num.network.threads=${KAFKA_NUM_NETWORK_THREADS:-8}
num.io.threads=${KAFKA_NUM_IO_THREADS:-8}
EOF
# Append any additional configs
if [ -n "$KAFKA_ADDITIONAL_CONFIG" ]; then
echo "$KAFKA_ADDITIONAL_CONFIG" >> "$config_file"
fi
}
# Initialize cluster if needed
initialize_cluster() {
if [ ! -f "/var/kafka/metadata/meta.properties" ]; then
if [ -z "$KAFKA_CLUSTER_ID" ]; then
KAFKA_CLUSTER_ID=$(kafka-storage.sh random-uuid)
echo "Generated Cluster ID: $KAFKA_CLUSTER_ID"
fi
kafka-storage.sh format -t "$KAFKA_CLUSTER_ID" -c "$config_file"
fi
}
# Main execution
if [ "$1" = "kafka" ]; then
configure_kafka
initialize_cluster
exec kafka-server-start.sh "$config_file"
else
exec "$@"
fi
Kubernetes Deployment with Operators
Using Strimzi Operator
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: production-cluster
namespace: kafka
spec:
kafka:
version: 3.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: loadbalancer
tls: true
config:
# KRaft configuration
process.roles: broker,controller
node.id: ${STRIMZI_BROKER_ID}
# Performance
num.network.threads: 8
num.io.threads: 8
# Replication
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
# Retention
log.retention.hours: 168
log.segment.bytes: 1073741824
storage:
type: persistent-claim
size: 100Gi
class: fast-ssd
resources:
requests:
memory: 8Gi
cpu: "2"
limits:
memory: 16Gi
cpu: "4"
jvmOptions:
-Xms: 6g
-Xmx: 6g
gcLoggingEnabled: true
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: kafka-metrics
key: kafka-metrics-config.yaml
entityOperator:
topicOperator:
resources:
requests:
memory: 512Mi
cpu: "0.5"
userOperator:
resources:
requests:
memory: 512Mi
cpu: "0.5"
ConfigMap for JMX Metrics
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-metrics
namespace: kafka
data:
kafka-metrics-config.yaml: |
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# JVM Metrics
- pattern: java.lang<type=(.*)>
# Kafka Broker Metrics
- pattern: kafka.server<type=(.+), name=(.+), topic=(.+)><>Count
name: kafka_server_$1_$2_total
type: COUNTER
labels:
topic: "$3"
# Controller Metrics
- pattern: kafka.controller<type=KafkaController, name=(.+)><>Value
name: kafka_controller_$1
type: GAUGE
# Network Metrics
- pattern: kafka.network<type=RequestMetrics, name=RequestsPerSec, request=(.+)><>Count
name: kafka_network_requests_total
type: COUNTER
labels:
request: "$1"
Dynamic Configuration Management
#!/bin/bash
# dynamic-config.sh
# Set dynamic broker configs
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type brokers \
--entity-name 1 \
--alter \
--add-config 'log.retention.ms=604800000,compression.type=lz4'
# Set cluster-wide dynamic configs
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type brokers \
--entity-default \
--alter \
--add-config 'log.segment.bytes=1073741824'
# View current dynamic configs
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type brokers \
--entity-name 1 \
--describe
Monitoring and Observability Setup
Prometheus JMX Exporter Configuration
# jmx-exporter-config.yaml
startDelaySeconds: 0
ssl: false
global:
scrapeInterval: 15s
evaluationInterval: 15s
rules:
# Kafka Broker Rules
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count'
name: kafka_broker_messages_in_total
labels:
topic: "$1"
type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count'
name: kafka_broker_bytes_in_total
labels:
topic: "$1"
type: COUNTER
# Controller Metrics
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
name: kafka_controller_offline_partitions
type: GAUGE
# Log Metrics
- pattern: 'kafka.log<type=LogManager, name=LogDirectoryOffline, logDirectory=(.+)><>Value'
name: kafka_log_directory_offline
labels:
directory: "$1"
type: GAUGE
Start Kafka with JMX Exporter
#!/bin/bash
# start-with-monitoring.sh
# Download JMX exporter if not present
if [ ! -f "/opt/jmx_exporter/jmx_prometheus_javaagent.jar" ]; then
mkdir -p /opt/jmx_exporter
curl -L https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.19.0/jmx_prometheus_javaagent-0.19.0.jar \
-o /opt/jmx_exporter/jmx_prometheus_javaagent.jar
fi
# Set JMX exporter as Java agent
export KAFKA_OPTS="-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/etc/kafka/jmx-exporter-config.yaml"
# Start Kafka
kafka-server-start.sh /etc/kafka/server.properties
Production Best Practices
1. Multi-Node Script
#!/bin/bash
# start-multi-node.sh
KAFKA_HOME="/opt/kafka"
BASE_PORT=9092
JMX_BASE_PORT=9999
start_node() {
local node_id=$1
local port=$((BASE_PORT + node_id))
local controller_port=$((19092 + node_id))
local jmx_port=$((JMX_BASE_PORT + node_id))
cat > "/tmp/server-${node_id}.properties" << EOF
node.id=${node_id}
process.roles=controller,broker
listeners=PLAINTEXT://localhost:${port},CONTROLLER://localhost:${controller_port}
advertised.listeners=PLAINTEXT://localhost:${port}
controller.listener.names=CONTROLLER
inter.broker.listener.name=PLAINTEXT
controller.quorum.voters=0@localhost:19092,1@localhost:19093,2@localhost:19094
log.dirs=/var/kafka/logs-${node_id}
metadata.log.dir=/var/kafka/metadata-${node_id}
EOF
JMX_PORT=$jmx_port $KAFKA_HOME/bin/kafka-server-start.sh \
"/tmp/server-${node_id}.properties" \
> "/var/log/kafka/broker-${node_id}.log" 2>&1 &
echo "Started broker ${node_id} on port ${port}, JMX: ${jmx_port}"
}
# Start 3 nodes
for i in {0..2}; do
start_node $i
done
2. Health Check and Auto-Recovery
#!/bin/bash
# kafka-health-monitor.sh
check_broker_health() {
local broker_id=$1
local port=$((9092 + broker_id))
# Check if port is listening
if ! nc -zv localhost $port 2>/dev/null; then
echo "Broker $broker_id is down, attempting restart..."
restart_broker $broker_id
fi
# Check JMX metrics
local offline_partitions=$(curl -s localhost:$((7071 + broker_id))/metrics | \
grep kafka_controller_offline_partitions | \
awk '{print $2}')
if [ "$offline_partitions" -gt 0 ]; then
echo "WARNING: Broker $broker_id has $offline_partitions offline partitions"
fi
}
restart_broker() {
local broker_id=$1
# Restart logic here
./start-node.sh $broker_id
}
# Continuous monitoring
while true; do
for i in {0..2}; do
check_broker_health $i
done
sleep 30
done
Cloud-Native Configuration Patterns
Environment-Based Configuration
#!/bin/bash
# cloud-config.sh
# Detect cloud environment
if [ -n "$KUBERNETES_SERVICE_HOST" ]; then
echo "Running in Kubernetes"
export KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${POD_NAME}.${SERVICE_NAME}.${NAMESPACE}.svc.cluster.local:9092"
elif [ -n "$ECS_CONTAINER_METADATA_URI" ]; then
echo "Running in AWS ECS"
PRIVATE_IP=$(curl -s $ECS_CONTAINER_METADATA_URI/task | jq -r '.Containers[0].Networks[0].IPv4Addresses[0]')
export KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${PRIVATE_IP}:9092"
elif [ -f "/sys/hypervisor/uuid" ] && grep -i ec2 /sys/hypervisor/uuid > /dev/null; then
echo "Running in AWS EC2"
PRIVATE_IP=$(curl -s http://169.254.169.254/latest/meta-data/local-ipv4)
export KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${PRIVATE_IP}:9092"
fi
Review Questions
Why use KRaft mode?
KRaft eliminates ZooKeeper dependency, simplifies operations, provides faster failover, and reduces operational complexity.
When to use dynamic configuration?
Use dynamic configs for runtime changes without restart, such as adjusting retention, compression, or rate limits.
How does containerization help?
Containers provide consistent environments, easier scaling, simplified deployment, and better resource isolation.
What’s the benefit of Kubernetes operators?
Operators automate lifecycle management, handle rolling updates, manage storage, and provide self-healing capabilities.
How to monitor broker health?
Use JMX metrics, health check endpoints, log monitoring, and automated alerting for comprehensive broker monitoring.
Summary
Modern Kafka broker startup in 2025 emphasizes:
- KRaft Mode for simplified consensus without ZooKeeper
- Container-First deployment strategies
- Dynamic Configuration for operational flexibility
- Automated Management through operators and scripts
- Comprehensive Monitoring with built-in observability
These patterns ensure reliable, scalable, and maintainable Kafka deployments!
Related Content
- What is Kafka?
- Kafka Architecture
- Kafka Topic Architecture
- Kafka Consumer Architecture
- Kafka Producer Architecture
- Kafka Low-Level Design
- Kafka Command Line Tutorial
- Kafka Cluster Failover
About Cloudurable
We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.
TweetApache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting