Kafka Tutorial with Examples

April 17, 2017

                                                                           

Kafka Tutorial

Kafka Tutorial for the Kafka streaming platform. Covers Kafka Architecture with some small examples from the command line. Then we expand on this with a multi-server example. Lastly, we added some simple Java client examples for a Kafka Producer and a Kafka Consumer. We have started to expand on the Java examples to correlate with the design discussion of Kafka. We have also expanded on the Kafka design section and added references.

Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Keep in mind that quite a few of the recent Java books on Kafka are using the older clients.

Slideshare Kafka Tutorial. PDF: Kafka Tutorial.

Simple Java Kafka example

package com.cloudurable.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;

import java.util.Collections;
import java.util.Properties;

public class KafkaExample {

    private final static String TOPIC = "my-example-topic";
    private final static String BOOTSTRAP_SERVERS =
            "localhost:9092,localhost:9093,localhost:9094";

    public static void main(String... args) throws Exception {
        runProducer(5);
        runConsumer();
    }

    private static Producer<Long, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(props);
    }

    private static Consumer<Long, String> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        Consumer<Long, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
        return consumer;
    }


// Async
//    static void runProducer(final int sendMessageCount) throws InterruptedException {
//        final Producer<Long, String> producer = createProducer();
//        long time = System.currentTimeMillis();
//        final CountDownLatch countDownLatch = new CountDownLatch(sendMessageCount);
//
//        try {
//            for (long index = time; index < time + sendMessageCount; index++) {
//                final ProducerRecord<Long, String> record =
//                        new ProducerRecord<>(TOPIC, index, "Hello Mom " + index);
//                producer.send(record, (metadata, exception) -> {
//                    long elapsedTime = System.currentTimeMillis() - time;
//                    if (metadata != null) {
//                        System.out.printf("sent record(key=%s value=%s) " +
//                                        "meta(partition=%d, offset=%d) time=%d\n",
//                                record.key(), record.value(), metadata.partition(),
//                                metadata.offset(), elapsedTime);
//                    } else {
//                        exception.printStackTrace();
//                    }
//                    countDownLatch.countDown();
//                });
//            }
//            countDownLatch.await(25, TimeUnit.SECONDS);
//        }finally {
//            producer.flush();
//            producer.close();
//        }
//    }

    static void runProducer(final int sendMessageCount) throws Exception {
        final Producer<Long, String> producer = createProducer();
        long time = System.currentTimeMillis();

        try {
            for (long index = time; index < time + sendMessageCount; index++) {
                final ProducerRecord<Long, String> record =
                        new ProducerRecord<>(TOPIC, index, "Hello Mom " + index);

                RecordMetadata metadata = producer.send(record).get();

                long elapsedTime = System.currentTimeMillis() - time;
                System.out.printf("sent record(key=%s value=%s) " +
                                "meta(partition=%d, offset=%d) time=%d\n",
                                record.key(), record.value(), metadata.partition(),
                        metadata.offset(), elapsedTime);

            }
        }finally {
            producer.flush();
            producer.close();
        }
    }


    static void runConsumer() throws InterruptedException {
        Consumer<Long, String> consumer = createConsumer();

        while (true) {
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(100);

            if (consumerRecords.count()==0) {
                break;
            }

            consumerRecords.forEach(record -> {
                System.out.println("Got Record: (" + record.key() + ", " + record.value()
                        + ") at offset " + record.offset());
            });
            consumer.commitAsync();
        }
        consumer.close();
        System.out.println("DONE");
    }

}

Kafka Training

Training for DevOps, Architects and Developers

This Kafka training course teaches the basics of the Apache Kafka distributed streaming platform. The Apache Kafka distributed streaming platform is one of the most powerful and widely used reliable streaming platforms. Kafka is a fault tolerant, highly scalable and used for log aggregation, stream processing, event sources and commit logs. Kafka is used by LinkedIn, Yahoo, Twitter, Square, Uber, Box, PayPal, Etsy and more to enable stream processing, online messaging, facilitate in-memory computing by providing a distributed commit log, data collection for big data and so much more.

This course provides a technical introduction to all the conceptual and practical areas needed to use the Kafka Streaming Platform successfully, and deploy to production.

It is written expressly for Developers who must develop and DevOps who must deploy with an emphasis on AWS deployments. The course provides a solid foundation in the architecture ok Kafka and how to work with it.

After taking this course, you will be ready to work with Kafka in an informed and productive manner.

We provide more than just developer training. We provide the training to maximize your developer and DevOps expertise.

In this three to four day hands-on course, developers, and DevOps learn how to build applications that can publish data to and subscribe to data from, a Kafka cluster. They learn about the setup and maintenance of that cluster.

Kafka Consulting

We provide Kafka consulting to help you get started. The Apache Kafka distributed streaming platform is one of the most powerful and widely used reliable streaming platforms. Kafka is a fault tolerant, highly scalable and used for log aggregation, stream processing, event sources and commit logs.

Up to 13 of Kafka deployments are on AWS. We can help you setup AWS and Kafka. We can also do custom development with Kafka. We specialize in Kafka AWS deployments.

Cloudurable can help you with:

  • Creating your own Kafka AWS AMIs (EC2 images)
  • Automating deployment (CloudFormations, Lambda, Ansible, etc.)
  • Automating VPC, Subnet, Placement groups, infrastructure
  • Setting up monitoring and log aggregation for OS and Kafka to CloudWatch
  • Automating common tasks like backups to S3 and/or EBS snapshotting, rolling updates, etc., with Ansible or OpsWorks
  • At-rest Kafka encryption with AWS KMS
  • Cluster mirroring to another AWS region
  • Picking the right EC2 instances to maximize throughput and resiliency without over-provisioning
  • Deciding when to use EBS and which volume type
  • Setting up VPC and subnets to survive a single AZ failure
  • Setting up a Kafka Cluster that does mirroring to multiple regions to provide disaster recovery
  • Implementing high-speed, cloud-native, microservices that use Kafka
  • Using Kafka in your reactive microservice environment

We have a thorough understanding of Kafka and Amazon AWS. We have the background to assist you to use the Kafka Streaming Platform successfully in AWS and deploy in AWS to support production. If you need to spin-up Kafka quickly and support it in production, then hire us. We can help you avoid costly mistakes. We also provide mentoring and Kafka training.

Kafka Microservices

Kafka is a key-ingredient in Microservices development. It helps move you away from slow and unresponsive shared-state architectures with its abundance of cascading failures to in-memory actors systems done in Kafka streams, QBit and Akka. We can show you how to set up the Kafka streaming platform as part of your reactive microservices architecture. Technology stacks like Kafka, Akka and QBit are the backbone for event driven microservices.

Let us help you set up a solid foundation in the architecture and data model of the Kafka Streaming Platform and how to deploy it correctly based on your use cases to AWS.

Why choose us and our Kafka Consulting

We have successfully deployed the streaming solutions at large fortune 100s and very high traffic web properties. We have the experience to deploy and monitor Kafka clusters running on AWS EC2. We have been there and done that and understand when and where streaming platforms makes sense and how to avoid common pitfalls.

AWS Kafka and Kafka as a Service

Our company is setup to support tools like Kafka running in AWS EC2. We have deployed 100 million user microservices in AWS using NoSQL solutions. We provide Kafka support, AMI images for Kafka, CloudFormation templates, and tools for collecting metrics and logs to support Kafka in AWS via CloudWatch. Supporting Kafka in production in AWS, using EC2, CloudWatch and S3 is what we do.

Rest assured that our consulting teams are the battle-hardened experts that you need for AWS Kafka deploys. Contact us to book Kafka on AWS consulting today. Call to book 1-415-758-1113.

Check out all of our SMACK mentoring, training and consulting

Cloudurable™ provides:

Contact us

For more details on the subscription support or pricing please contact us or call ((415) 758-1113) or write info@cloudurable.com.


About Cloudurable

We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.

                                                                           

Apache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting