May 13, 2017
ROUGH DRAFT
If you are not sure what Kafka is, start here “What is Kafka?”.
Getting started with Kafka cluster tutorial
Understanding Kafka Failover
This Kafka tutorial picks up right where the first Kafka tutorial from the command line left off. The first tutorial has instructions on how to run ZooKeeper and use Kafka utils.
In this tutorial, we are going to run many Kafka Nodes on our development laptop so that you will need at least 16 GB of RAM for local dev machine. You can run just two servers if you have less memory than 16 GB. We are going to create a replicated topic. We then demonstrate consumer failover and broker failover. We also demonstrate load balancing Kafka consumers. We show how, with many groups, Kafka acts like a Publish/Subscribe. But, when we put all of our consumers in the same group, Kafka will load share the messages to the consumers in the same group (more like a queue than a topic in a traditional MOM sense).
If not already running, then start up ZooKeeper (./run-zookeeper.sh
from the first tutorial). Also, shut down Kafka from the first tutorial.
Next, you need to copy server properties for three brokers (detailed instructions to follow). Then we will modify these Kafka server properties to add unique Kafka ports, Kafka log locations, and unique Broker ids. Then we will create three scripts to start these servers up using these properties, and then start the servers. Lastly, we create replicated topic and use it to demonstrate Kafka consumer failover, and Kafka broker failover.
Create three new Kafka server-n.properties files
In this section, we will copy the existing Kafka server.properties to server-0.properties, server-1.properties, server-2.properties.
Then we change server-0.properties
to set log.dirs
to “/tmp/kafka-logs-1”
.
Then we modify server-1.properties
to set port
to 9093
, broker id
to 1
, and log.dirs
to “/tmp/kafka-logs-1”
.
Lastly modify server-2.properties
to use port
9094, broker id
2, and log.dirs
“/tmp/kafka-logs-2”
.
Copy server properties file
$ ~/kafka-training
$ mkdir -p lab2/config
$ cp kafka/config/server.properties kafka/lab2/config/server-0.properties
$ cp kafka/config/server.properties kafka/lab2/config/server-1.properties
$ cp kafka/config/server.properties kafka/lab2/config/server-2.properties
With your favorite text editor change server-0.properties so that log.dirs
is set to ./logs/kafka-0
.
Leave the rest of the file the same. Make sure log.dirs
is only defined once.
~/kafka-training/lab2/config/server-0.properties
broker.id=0
port=9092
log.dirs=./logs/kafka-0
...
With your favorite text editor change log.dirs
, broker.id
and and log.dirs
of server-1.properties
as follows.
~/kafka-training/lab2/config/server-1.properties
broker.id=1
port=9093
log.dirs=./logs/kafka-1
...
With your favorite text editor change log.dirs
, broker.id
and and log.dirs
of server-2.properties
as follows.
~/kafka-training/lab2/config/server-2.properties
broker.id=2
port=9094
log.dirs=./logs/kafka-2
...
Create Startup scripts for three Kafka servers
~/kafka-training/lab2/start-1st-server.sh
#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
"$CONFIG/server-0.properties"
~/kafka-training/lab2/start-2nd-server.sh
#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
"$CONFIG/server-1.properties"
~/kafka-training/lab2/start-3rd-server.sh
#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
"$CONFIG/server-2.properties"
Notice we are Passing the Kafka server properties files that we created in the last step.
Now run all three in separate terminals/shells.
Run Kafka servers each in own terminal from ~/kafka-training/lab2
~/kafka-training/lab2
$ ./start-1st-server.sh
...
$ ./start-2nd-server.sh
...
$ ./start-3rd-server.sh
Give the servers a minute to startup and connect to ZooKeeper.
Create Kafka replicated topic my-failsafe-topic
Now we will create a replicated topic that the producers and consumers can use.
~/kafka-training/lab2/create-replicated-topic.sh
#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 3 \
--partitions 13 \
--topic my-failsafe-topic
Notice that the replication factor gets set to 3, and the topic name is my-failsafe-topic
, and
like before it has 13 partitions.
Then we just have to run the script to create the topic.
Run create-replicated-topic.sh
~/kafka-training/lab2
$ ./create-replicated-topic.sh
Start Kafka Consumer that uses Replicated Topic
Next, create a script that starts the consumer and then start the consumer with the script.
~/kafka-training/lab2/start-consumer-console-replicated.sh
#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9094,localhost:9092 \
--topic my-failsafe-topic \
--from-beginning
Notice that we pass a list of Kafka servers to --bootstrap-server
. We pass two of the three servers that we ran earlier.
Only one broker is needed, and the consumer client will learn about the other brokers from this one server. Usually, you list multiple brokers in case there is an outage so that the client can connect.
Now we just run this script to start the consumer.
Run start-consumer-console-replicated.sh
~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Start Kafka Producer that uses Replicated Topic
Next, we create a script that starts the producer and then start the producer with the script.
~/kafka-training/lab2/start-consumer-producer-replicated.sh
#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-console-producer.sh \
--broker-list localhost:9092,localhost:9093 \
--topic my-failsafe-topic
Notice we start Kafka producer and pass it a list of Kafka Brokers to use.
Now use the script to launch the producer as follows.
Run start-producer-console-replicated.sh
~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
Now send messages
Now send some message from the producer to Kafka and see those messages consumed by the consumer.
Producer Console
~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
Consumer Console
~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
Now Start two more consumers and send more messages
Now Start two more consumers in their own terminal window and send more messages from the producer.
Producer Console
~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3
Consumer Console 1st
~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3
Consumer Console 2nd in new Terminal
~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3
Consumer Console 2nd in new Terminal
~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3
Notice that the messages are sent to all of the consumers because each consumer is in a different consumer group.
Change consumer to be in their own consumer group
Stop the producers and the consumers from before, but leave Kafka and ZooKeeper running.
Now let’s modify the start-consumer-console-replicated.sh
script.
We want to put all of the consumers in same consumer group.
This way they will share the messages as each consumer group will get its share of partitions.
~/kafka-training/lab2/start-consumer-console-replicated.sh
#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9094,localhost:9092 \
--topic my-failsafe-topic \
--consumer-property group.id=mygroup
Notice that the script is the same as before except we added --consumer-property group.id=mygroup
which will put every consumer that runs with this script into the mygroup
consumer group.
Now we just run the producer and three consumers.
Run this three times - start-consumer-console-replicated.sh
~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
Run Producer Console
~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
Now send seven messages from the Kafka producer console.
Producer Console
~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
m1
m2
m3
m4
m5
m6
m7
Notice that the messages are spread evenly among the consumers.
1st Kafka Consumer gets m3, m5
~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m3
m5
2nd Kafka Consumer gets m2, m6
~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m2
m6
3rd Kafka Consumer gets m1, m4, m7
~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m1
m4
m7
Consumer Failover
Next, let’s demonstrate consumer failover by killing one of the consumers and sending seven more messages.
First, kill the third consumer (CTRL-C in the consumer terminal does the trick).
Now send seven more messages with the Kafka console producer.
Producer Console
~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
m1
...
m8
m9
m10
m11
m12
m13
m14
Notice that the messages are spread evenly among the remaining consumers.
1st Kafka Consumer gets m8, m9, m11, m14
~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m3
m5
m8
m9
m11
m14
2nd Kafka Consumer gets m10, m12, m13
~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m2
m6
m10
m12
m13
We killed one consumer. Sent seven more messages, and saw the load spread to remaining consumers. Consumer failover Works!
Create Kafka Describe Topic Script
-—describe
will show list partitions, ISRs, and partition leadership
~/kafka-training/lab2/describe-topics.sh
#!/usr/bin/env bash
cd ~/kafka-training
# List existing topics
kafka/bin/kafka-topics.sh --describe \
--topic my-failsafe-topic \
--zookeeper localhost:2181
Run describe-topics
Lists which broker owns (leader of) which partition Lists Replicas and ISR (replicas that are up to date) Notice there are 13 topics
2nd Kafka Consumer gets m10, m12, m13
~/kafka-training/lab2
$ ./describe-topics.sh
Topic:my-failsafe-topic PartitionCount:13 ReplicationFactor:3 Configs:
Topic: my-failsafe-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: my-failsafe-topic Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: my-failsafe-topic Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: my-failsafe-topic Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: my-failsafe-topic Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: my-failsafe-topic Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: my-failsafe-topic Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: my-failsafe-topic Partition: 7 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: my-failsafe-topic Partition: 8 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: my-failsafe-topic Partition: 9 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: my-failsafe-topic Partition: 10 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: my-failsafe-topic Partition: 11 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: my-failsafe-topic Partition: 12 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Test Failover by killing 1st server
Kill the first server
Kill the first broker
$ kill `ps aux | grep java | grep server-0.properties | tr -s " " | cut -d " " -f2`
Use Kafka topic describe to see that a new leader was elected!
Run describe-topics again
~/kafka-training/lab2/solution
$ ./describe-topics.sh
Topic:my-failsafe-topic PartitionCount:13 ReplicationFactor:3 Configs:
Topic: my-failsafe-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,1
Topic: my-failsafe-topic Partition: 1 Leader: 1 Replicas: 0,1,2 Isr: 1,2
Topic: my-failsafe-topic Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2
Topic: my-failsafe-topic Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1
Topic: my-failsafe-topic Partition: 4 Leader: 2 Replicas: 0,2,1 Isr: 2,1
Topic: my-failsafe-topic Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,2
Topic: my-failsafe-topic Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,1
Topic: my-failsafe-topic Partition: 7 Leader: 1 Replicas: 0,1,2 Isr: 1,2
Topic: my-failsafe-topic Partition: 8 Leader: 1 Replicas: 1,2,0 Isr: 1,2
Topic: my-failsafe-topic Partition: 9 Leader: 2 Replicas: 2,1,0 Isr: 2,1
Topic: my-failsafe-topic Partition: 10 Leader: 2 Replicas: 0,2,1 Isr: 2,1
Topic: my-failsafe-topic Partition: 11 Leader: 1 Replicas: 1,0,2 Isr: 1,2
Topic: my-failsafe-topic Partition: 12 Leader: 2 Replicas: 2,0,1 Isr: 2,1
Show Broker Failover Worked
Send two more messages from the producer Notice that the consumer gets the messages Broker Failover WORKS!
Producer Console
~/kafka-training/lab2
$ ./start-consumer-producer-replicated.sh
m1
...
m15
m16
Notice that the messages are spread evenly among the remaining consumers.
1st Kafka Consumer gets m16
~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m3
m5
m8
m9
m11
m14
...
m16
2nd Kafka Consumer gets m15
~/kafka-training/lab2
$ ./start-consumer-console-replicated.sh
m2
m6
m10
m12
m13
...
m15
Kafka Cluster Failover Review
Why did the three consumers not load share the messages at first?
How did we demonstrate failover for consumers?
How did we demonstrate failover for producers?
What tool and option did we use to show ownership of partitions and the ISRs?
More about Kafka
To learn about Kafka see Kafka architecture, Kafka topic architecture and Kafka producer architecture.
Related content
About Cloudurable
We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial