Kafka Broker Startup Scripts and Configuration - 2025 Edition

January 9, 2025

                                                                           

🚀 What’s New in This 2025 Update

Major Updates and Changes

  • KRaft Mode by Default - No ZooKeeper required
  • Container-First Approach - Docker and Kubernetes native
  • Dynamic Configuration - Minimal restarts needed
  • Automated Management - Operators handle lifecycle
  • Enhanced Monitoring - Built-in observability
  • Cloud-Native Patterns - Auto-scaling and self-healing

Broker Startup Evolution Since 2017

  • ✅ No ZooKeeper - KRaft provides native consensus
  • ✅ Declarative Config - GitOps and Infrastructure as Code
  • ✅ Container Ready - Optimized for Kubernetes
  • ✅ Zero-Downtime - Rolling updates standard

Running a Kafka Broker in 2025

Starting Kafka brokers has evolved significantly with KRaft mode and cloud-native practices. This guide covers modern approaches from simple local development to production Kubernetes deployments.

Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

KRaft Mode Startup (No ZooKeeper!)

flowchart LR
  subgraph KRaftCluster["Kafka Cluster with KRaft"]
    subgraph Controllers["Controllers (Quorum)"]
      C1[Controller 1<br>node.id=1]
      C2[Controller 2<br>node.id=2]
      C3[Controller 3<br>node.id=3]
    end
    
    subgraph Brokers["Brokers"]
      B1[Broker 1<br>node.id=4]
      B2[Broker 2<br>node.id=5]
      B3[Broker 3<br>node.id=6]
    end
  end
  
  C1 <--> C2
  C2 <--> C3
  C3 <--> C1
  
  Controllers --> Brokers
  
  style Controllers fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
  style Brokers fill:#e8f5e9,stroke:#43a047,stroke-width:1px

Basic KRaft Configuration

Create a KRaft configuration file:

config/kraft/server.properties

# Node identity - unique across cluster
node.id=1

# Roles: controller, broker, or both
process.roles=controller,broker

# Controller quorum voters
controller.quorum.voters=1@localhost:9093,2@localhost:9094,3@localhost:9095

# Listeners
listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9092
controller.listener.names=CONTROLLER
inter.broker.listener.name=PLAINTEXT

# Log directories
log.dirs=/var/kafka/logs
metadata.log.dir=/var/kafka/metadata

# Performance settings
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# Replication
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

# Log retention
log.retention.hours=168
log.segment.bytes=1073741824

Initialize and Start KRaft Cluster

#!/bin/bash
# generate-cluster-id.sh

# Generate a unique cluster ID
KAFKA_CLUSTER_ID="$(kafka-storage.sh random-uuid)"
echo "Generated Cluster ID: $KAFKA_CLUSTER_ID"

# Store for later use
echo "$KAFKA_CLUSTER_ID" > /var/kafka/cluster.id
#!/bin/bash
# format-storage.sh

KAFKA_CLUSTER_ID=$(cat /var/kafka/cluster.id)

# Format storage for each node
kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /etc/kafka/server.properties
#!/bin/bash
# start-kafka.sh

# Start Kafka with KRaft
kafka-server-start.sh /etc/kafka/server.properties

Advanced Startup Script with Health Checks

#!/bin/bash
# advanced-kafka-start.sh

set -e

# Configuration
KAFKA_HOME="${KAFKA_HOME:-/opt/kafka}"
CONFIG_FILE="${1:-/etc/kafka/server.properties}"
LOG_DIR="${LOG_DIR:-/var/log/kafka}"
HEALTH_CHECK_RETRIES=30
HEALTH_CHECK_INTERVAL=2

# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'

log() {
    echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')]${NC} $1"
}

error() {
    echo -e "${RED}[ERROR]${NC} $1" >&2
}

warn() {
    echo -e "${YELLOW}[WARN]${NC} $1"
}

# Pre-flight checks
pre_flight_checks() {
    log "Running pre-flight checks..."
    
    # Check Java version
    if ! java -version 2>&1 | grep -q "version \"17"; then
        error "Java 17 required for Kafka 4.0+"
        exit 1
    fi
    
    # Check config file
    if [[ ! -f "$CONFIG_FILE" ]]; then
        error "Config file not found: $CONFIG_FILE"
        exit 1
    fi
    
    # Check if already running
    if pgrep -f "kafka.Kafka" > /dev/null; then
        warn "Kafka appears to be already running"
        exit 0
    fi
    
    # Create directories
    mkdir -p "$LOG_DIR"
    
    log "Pre-flight checks passed"
}

# Extract configuration values
get_config_value() {
    grep "^$1=" "$CONFIG_FILE" | cut -d'=' -f2
}

# Health check function
health_check() {
    local node_id=$(get_config_value "node.id")
    local listeners=$(get_config_value "listeners")
    local port=$(echo "$listeners" | grep -oP 'PLAINTEXT://[^:]+:\K\d+' | head -1)
    
    log "Checking health on port $port..."
    
    for i in $(seq 1 $HEALTH_CHECK_RETRIES); do
        if nc -zv localhost "$port" 2>/dev/null; then
            log "Kafka broker is healthy"
            return 0
        fi
        
        warn "Health check attempt $i/$HEALTH_CHECK_RETRIES failed"
        sleep $HEALTH_CHECK_INTERVAL
    done
    
    error "Kafka failed to start after $HEALTH_CHECK_RETRIES attempts"
    return 1
}

# Main startup
main() {
    pre_flight_checks
    
    # Set JMX options
    export JMX_PORT="${JMX_PORT:-9999}"
    export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \
        -Dcom.sun.management.jmxremote.authenticate=false \
        -Dcom.sun.management.jmxremote.ssl=false \
        -Dcom.sun.management.jmxremote.port=$JMX_PORT"
    
    # Set JVM options
    export KAFKA_HEAP_OPTS="${KAFKA_HEAP_OPTS:--Xmx2G -Xms2G}"
    export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC \
        -XX:MaxGCPauseMillis=20 \
        -XX:InitiatingHeapOccupancyPercent=35 \
        -XX:+ExplicitGCInvokesConcurrent"
    
    log "Starting Kafka with config: $CONFIG_FILE"
    log "JMX Port: $JMX_PORT"
    
    # Start Kafka in background
    nohup "$KAFKA_HOME/bin/kafka-server-start.sh" "$CONFIG_FILE" \
        > "$LOG_DIR/kafka-startup.log" 2>&1 &
    
    local kafka_pid=$!
    echo $kafka_pid > /var/run/kafka.pid
    
    log "Kafka started with PID: $kafka_pid"
    
    # Wait for startup and check health
    if health_check; then
        log "Kafka broker started successfully"
        
        # Log broker metadata
        local node_id=$(get_config_value "node.id")
        local roles=$(get_config_value "process.roles")
        log "Node ID: $node_id, Roles: $roles"
    else
        error "Kafka startup failed"
        kill $kafka_pid 2>/dev/null
        exit 1
    fi
}

# Run main function
main "$@"

Containerized Kafka Deployment

Dockerfile for Kafka

FROM eclipse-temurin:17-jre-jammy

# Kafka version
ENV KAFKA_VERSION=3.6.0
ENV SCALA_VERSION=2.13
ENV KAFKA_HOME=/opt/kafka

# Install dependencies
RUN apt-get update && \
    apt-get install -y curl netcat-openbsd && \
    rm -rf /var/lib/apt/lists/*

# Download and install Kafka
RUN curl -L https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
    | tar -xzC /opt && \
    mv /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME}

# Create kafka user
RUN useradd -r -s /bin/bash kafka && \
    chown -R kafka:kafka ${KAFKA_HOME}

# Volume for logs and data
VOLUME ["/var/kafka/logs", "/var/kafka/metadata"]

# Copy startup script
COPY --chown=kafka:kafka docker-entrypoint.sh /
RUN chmod +x /docker-entrypoint.sh

USER kafka
WORKDIR ${KAFKA_HOME}

EXPOSE 9092 9093 9999

ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["kafka"]

Docker Entrypoint Script

#!/bin/bash
# docker-entrypoint.sh

set -e

# Generate node.id if not set
if [ -z "$KAFKA_NODE_ID" ]; then
    KAFKA_NODE_ID="${HOSTNAME##*-}"
    echo "Generated node.id: $KAFKA_NODE_ID"
fi

# Configure based on environment variables
configure_kafka() {
    local config_file="/opt/kafka/config/kraft/server.properties"
    
    # Create config from template
    cat > "$config_file" << EOF
node.id=${KAFKA_NODE_ID}
process.roles=${KAFKA_PROCESS_ROLES:-controller,broker}

# Listeners
listeners=${KAFKA_LISTENERS:-PLAINTEXT://:9092,CONTROLLER://:9093}
advertised.listeners=${KAFKA_ADVERTISED_LISTENERS:-PLAINTEXT://localhost:9092}
controller.listener.names=CONTROLLER
inter.broker.listener.name=PLAINTEXT

# Controller configuration
controller.quorum.voters=${KAFKA_CONTROLLER_QUORUM_VOTERS}

# Storage
log.dirs=${KAFKA_LOG_DIRS:-/var/kafka/logs}
metadata.log.dir=${KAFKA_METADATA_LOG_DIR:-/var/kafka/metadata}

# Replication
offsets.topic.replication.factor=${KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:-3}
transaction.state.log.replication.factor=${KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:-3}
transaction.state.log.min.isr=${KAFKA_TRANSACTION_STATE_LOG_MIN_ISR:-2}

# Performance
num.network.threads=${KAFKA_NUM_NETWORK_THREADS:-8}
num.io.threads=${KAFKA_NUM_IO_THREADS:-8}
EOF
    
    # Append any additional configs
    if [ -n "$KAFKA_ADDITIONAL_CONFIG" ]; then
        echo "$KAFKA_ADDITIONAL_CONFIG" >> "$config_file"
    fi
}

# Initialize cluster if needed
initialize_cluster() {
    if [ ! -f "/var/kafka/metadata/meta.properties" ]; then
        if [ -z "$KAFKA_CLUSTER_ID" ]; then
            KAFKA_CLUSTER_ID=$(kafka-storage.sh random-uuid)
            echo "Generated Cluster ID: $KAFKA_CLUSTER_ID"
        fi
        
        kafka-storage.sh format -t "$KAFKA_CLUSTER_ID" -c "$config_file"
    fi
}

# Main execution
if [ "$1" = "kafka" ]; then
    configure_kafka
    initialize_cluster
    
    exec kafka-server-start.sh "$config_file"
else
    exec "$@"
fi

Kubernetes Deployment with Operators

Using Strimzi Operator

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: production-cluster
  namespace: kafka
spec:
  kafka:
    version: 3.6.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
      - name: external
        port: 9094
        type: loadbalancer
        tls: true
    config:
      # KRaft configuration
      process.roles: broker,controller
      node.id: ${STRIMZI_BROKER_ID}
      # Performance
      num.network.threads: 8
      num.io.threads: 8
      # Replication
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      # Retention
      log.retention.hours: 168
      log.segment.bytes: 1073741824
    storage:
      type: persistent-claim
      size: 100Gi
      class: fast-ssd
    resources:
      requests:
        memory: 8Gi
        cpu: "2"
      limits:
        memory: 16Gi
        cpu: "4"
    jvmOptions:
      -Xms: 6g
      -Xmx: 6g
      gcLoggingEnabled: true
    metricsConfig:
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics
          key: kafka-metrics-config.yaml
  entityOperator:
    topicOperator:
      resources:
        requests:
          memory: 512Mi
          cpu: "0.5"
    userOperator:
      resources:
        requests:
          memory: 512Mi
          cpu: "0.5"

ConfigMap for JMX Metrics

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-metrics
  namespace: kafka
data:
  kafka-metrics-config.yaml: |
    lowercaseOutputName: true
    lowercaseOutputLabelNames: true
    rules:
    # JVM Metrics
    - pattern: java.lang<type=(.*)>
    # Kafka Broker Metrics
    - pattern: kafka.server<type=(.+), name=(.+), topic=(.+)><>Count
      name: kafka_server_$1_$2_total
      type: COUNTER
      labels:
        topic: "$3"
    # Controller Metrics
    - pattern: kafka.controller<type=KafkaController, name=(.+)><>Value
      name: kafka_controller_$1
      type: GAUGE
    # Network Metrics
    - pattern: kafka.network<type=RequestMetrics, name=RequestsPerSec, request=(.+)><>Count
      name: kafka_network_requests_total
      type: COUNTER
      labels:
        request: "$1"    

Dynamic Configuration Management

#!/bin/bash
# dynamic-config.sh

# Set dynamic broker configs
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type brokers \
  --entity-name 1 \
  --alter \
  --add-config 'log.retention.ms=604800000,compression.type=lz4'

# Set cluster-wide dynamic configs
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type brokers \
  --entity-default \
  --alter \
  --add-config 'log.segment.bytes=1073741824'

# View current dynamic configs
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type brokers \
  --entity-name 1 \
  --describe

Monitoring and Observability Setup

Prometheus JMX Exporter Configuration

# jmx-exporter-config.yaml
startDelaySeconds: 0
ssl: false
global:
  scrapeInterval: 15s
  evaluationInterval: 15s

rules:
  # Kafka Broker Rules
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count'
    name: kafka_broker_messages_in_total
    labels:
      topic: "$1"
    type: COUNTER
    
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count'
    name: kafka_broker_bytes_in_total
    labels:
      topic: "$1"
    type: COUNTER
    
  # Controller Metrics
  - pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
    name: kafka_controller_offline_partitions
    type: GAUGE
    
  # Log Metrics
  - pattern: 'kafka.log<type=LogManager, name=LogDirectoryOffline, logDirectory=(.+)><>Value'
    name: kafka_log_directory_offline
    labels:
      directory: "$1"
    type: GAUGE

Start Kafka with JMX Exporter

#!/bin/bash
# start-with-monitoring.sh

# Download JMX exporter if not present
if [ ! -f "/opt/jmx_exporter/jmx_prometheus_javaagent.jar" ]; then
    mkdir -p /opt/jmx_exporter
    curl -L https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.19.0/jmx_prometheus_javaagent-0.19.0.jar \
        -o /opt/jmx_exporter/jmx_prometheus_javaagent.jar
fi

# Set JMX exporter as Java agent
export KAFKA_OPTS="-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/etc/kafka/jmx-exporter-config.yaml"

# Start Kafka
kafka-server-start.sh /etc/kafka/server.properties

Production Best Practices

1. Multi-Node Script

#!/bin/bash
# start-multi-node.sh

KAFKA_HOME="/opt/kafka"
BASE_PORT=9092
JMX_BASE_PORT=9999

start_node() {
    local node_id=$1
    local port=$((BASE_PORT + node_id))
    local controller_port=$((19092 + node_id))
    local jmx_port=$((JMX_BASE_PORT + node_id))
    
    cat > "/tmp/server-${node_id}.properties" << EOF
node.id=${node_id}
process.roles=controller,broker
listeners=PLAINTEXT://localhost:${port},CONTROLLER://localhost:${controller_port}
advertised.listeners=PLAINTEXT://localhost:${port}
controller.listener.names=CONTROLLER
inter.broker.listener.name=PLAINTEXT
controller.quorum.voters=0@localhost:19092,1@localhost:19093,2@localhost:19094
log.dirs=/var/kafka/logs-${node_id}
metadata.log.dir=/var/kafka/metadata-${node_id}
EOF
    
    JMX_PORT=$jmx_port $KAFKA_HOME/bin/kafka-server-start.sh \
        "/tmp/server-${node_id}.properties" \
        > "/var/log/kafka/broker-${node_id}.log" 2>&1 &
    
    echo "Started broker ${node_id} on port ${port}, JMX: ${jmx_port}"
}

# Start 3 nodes
for i in {0..2}; do
    start_node $i
done

2. Health Check and Auto-Recovery

#!/bin/bash
# kafka-health-monitor.sh

check_broker_health() {
    local broker_id=$1
    local port=$((9092 + broker_id))
    
    # Check if port is listening
    if ! nc -zv localhost $port 2>/dev/null; then
        echo "Broker $broker_id is down, attempting restart..."
        restart_broker $broker_id
    fi
    
    # Check JMX metrics
    local offline_partitions=$(curl -s localhost:$((7071 + broker_id))/metrics | \
        grep kafka_controller_offline_partitions | \
        awk '{print $2}')
    
    if [ "$offline_partitions" -gt 0 ]; then
        echo "WARNING: Broker $broker_id has $offline_partitions offline partitions"
    fi
}

restart_broker() {
    local broker_id=$1
    # Restart logic here
    ./start-node.sh $broker_id
}

# Continuous monitoring
while true; do
    for i in {0..2}; do
        check_broker_health $i
    done
    sleep 30
done

Cloud-Native Configuration Patterns

Environment-Based Configuration

#!/bin/bash
# cloud-config.sh

# Detect cloud environment
if [ -n "$KUBERNETES_SERVICE_HOST" ]; then
    echo "Running in Kubernetes"
    export KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${POD_NAME}.${SERVICE_NAME}.${NAMESPACE}.svc.cluster.local:9092"
elif [ -n "$ECS_CONTAINER_METADATA_URI" ]; then
    echo "Running in AWS ECS"
    PRIVATE_IP=$(curl -s $ECS_CONTAINER_METADATA_URI/task | jq -r '.Containers[0].Networks[0].IPv4Addresses[0]')
    export KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${PRIVATE_IP}:9092"
elif [ -f "/sys/hypervisor/uuid" ] && grep -i ec2 /sys/hypervisor/uuid > /dev/null; then
    echo "Running in AWS EC2"
    PRIVATE_IP=$(curl -s http://169.254.169.254/latest/meta-data/local-ipv4)
    export KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${PRIVATE_IP}:9092"
fi

Review Questions

Why use KRaft mode?

KRaft eliminates ZooKeeper dependency, simplifies operations, provides faster failover, and reduces operational complexity.

When to use dynamic configuration?

Use dynamic configs for runtime changes without restart, such as adjusting retention, compression, or rate limits.

How does containerization help?

Containers provide consistent environments, easier scaling, simplified deployment, and better resource isolation.

What’s the benefit of Kubernetes operators?

Operators automate lifecycle management, handle rolling updates, manage storage, and provide self-healing capabilities.

How to monitor broker health?

Use JMX metrics, health check endpoints, log monitoring, and automated alerting for comprehensive broker monitoring.

Summary

Modern Kafka broker startup in 2025 emphasizes:

  • KRaft Mode for simplified consensus without ZooKeeper
  • Container-First deployment strategies
  • Dynamic Configuration for operational flexibility
  • Automated Management through operators and scripts
  • Comprehensive Monitoring with built-in observability

These patterns ensure reliable, scalable, and maintainable Kafka deployments!

About Cloudurable

We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.

                                                                           
comments powered by Disqus

Apache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting