Kafka Tutorial: Creating Advanced Kafka Producers in Java

July 3, 2017

                                                                           

Kafka Tutorial 13: Creating Advanced Kafka Producers in Java

In this tutorial, you are going to create advanced Kafka Producers.

Before you start

The prerequisites to this tutorial are

This tutorial picks up right where Kafka Tutorial Part 11: Writing a Kafka Producer example in Java and Kafka Tutorial Part 12: Writing a Kafka Consumer example in Java left off. In the last two tutorial, we created simple Java example that creates a Kafka producer and a consumer.

Kafka Tutorial 13: Creating Advanced Kafka Producers in Java Slides

This tutorial covers advanced producer topics like custom serializers, ProducerInterceptors, custom Partitioners, timeout, record batching & linger, and compression.

This tutorial is under construction, but we have complete example code and slides explaining custom Serializers, ProducerInterceptors, custom Partitioners, timeout, record batching & linger, and compression.

Kafka Producers

A producer is a type of Kafka client that publishes records to Kafka cluster. The Kafka client API for Producers are thread safe. A Kafka Producer has a pool of buffer that holds to-be-sent records. The producer has background, I/O threads for turning records into request bytes and transmitting requests to Kafka cluster. The producer must be closed to not leak resources, i.e., connections, thread pools, buffers.

Kafka Producer Send, Acks and Buffers

The Kafka Producer has a send() method which is asynchronous. Calling the send method adds the record to the output buffer and return right away. The buffer is used to batch records for efficient IO and compression. The Kafka Producer configures acks to control record durability. The ”all” acks setting ensures full commit of record to all replicas and is most durable and least fast setting. The Kafka Producer can automatically retry failed requests. The Producer has buffers of unsent records per topic partition (sized at batch.size).

Kafka Producer: Buffering and batching

The Kafka Producer buffers are available to send immediately. The buffers are sent as fast as broker can keep up (limited by in-flight max.in.flight.requests.per.connection). To reduce requests count and increase throughput, set linger.ms > 0. This setting forces the Producer to wait up to linger.ms before sending contents of buffer or until batch fills up whichever comes first. Under heavy load linger.ms not met as the buffer fills up before the linger.ms duration completes. Under lighter load, the producer can use to linger to increase broker IO throughput and increase compression. The buffer.memory controls total memory available to a producer for buffering. If records get sent faster than they can be transmitted to Kafka then and this buffer will get exceeded then additional send calls block up to max.block.ms after then Producer throws a TimeoutException.

Producer Acks

When using a producer, you can configure its acks (Acknowledgments) which default to “all”. The acks config setting is the write-acknowledgment received count required from partition leader before the producer write request is deemed complete. This setting controls the producer’s durability which can be very strong (all) or none. Durability is a tradeoff between throughput and consistency. The acks setting is set to “all” (-1), “none” (0), or “leader” (1).

Acks 0 (NONE)

The acks=0 is none meaning the Producer does not wait for any ack from Kafka broker at all. The records added to the socket buffer are considered sent. There are no guarantees of durability. The record offset returned from the send method is set to -1 (unknown). There could be record loss if the leader is down. There could be use cases that need to maximize throughput over durability, for example, log aggregation.

Acks 1 (LEADER)

The acks=1 is leader acknowledgment. The means that the Kafka broker acknowledges that the partition leader wrote the record to its local log but responds without the partition followers confirming the write. If leader fails right after sending ack, the record could be lost as the followers might not have replicated the record yet. Record loss is rare but possible, and you might only see this used if a rarely missed record is not statistically significant, log aggregation, a collection of data for machine learning or dashboards, etc.

Acks -1 (ALL)

The acks=all or acks=-1 is all acknowledgment which means the leader gets write confirmation from the full set of ISRs before sending an ack back to the producer. This guarantees that a record is not lost as long as one ISR remains alive. This ack=all setting is the strongest available guarantee that Kafka provides for durability.
This setting is even stronger with broker setting min.insync.replicas which specifies the minimum number of ISRs that must acknowledge a write. Most use cases will use acks=all and set a min.insync.replicas > 1.

Setting acks config on Kafka Producer

package com.cloudurable.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

public class StockPriceKafkaProducer {

    private static Producer<String, StockPrice> createProducer() {
        final Properties props = new Properties();
        setupBootstrapAndSerializers(props);
        setupBatchingAndCompression(props);
        setupRetriesInFlightTimeout(props);

        //Set number of acknowledgments - acks - default is all
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        ...
        return new KafkaProducer<>(props);
    }
    ...
}

The “acks” (ProducerConfig.ACKS_CONFIG) config gets passed to the properties via the constructor of the KafkaProducer.

Kafka Producer: Producer Buffer Memory Size

You can also set the producer config property buffer.memory which default 32 MB of memory. This denotes the total memory (in bytes) that the producer can use to buffer records to be sent to the broker. The Producer blocks up to max.block.ms if buffer.memory is exceeded. If the Producer is sending records faster than the broker can receive records, an exception is thrown.

Kafka Producer: Batching by Size

The producer config property batch.size defaults to 16K bytes. This is used by the Producer to batch records. This setting enables fewer requests and allows multiple records to be sent to the same partition. Use this batch.size setting to improve IO throughput and performance on both producer and server (and consumer). Using a larger batch.size makes compression more efficient. If a record is larger than the batch size, it will not be batched. This setting allows the Producer to send requests containing multiple batches. The batches are per partition. The smaller the batch size the less the throughput and performance. If the batch size is too big and often gets sent before full, the memory allocated for the batch is wasted.

Kafka Producer: Batching by Time and Size

The producer config property linger.ms defaults to 0. You can set this so that the Producer will wait this long before sending if batch size not exceeded. This setting allows the Producer to group together any records that arrive before they can be sent into a batch. Setting this value to 5 ms is greater is good if records arrive faster than they can be sent out. The producer can reduce requests count even under moderate load using linger.ms.

The linger.ms setting adds a delay to wait for more records to build up, so larger batches get sent. Increase linger.ms to increase brokers throughput at the cost of producer latency. If the producer gets records whose size is batch.size or more for a broker’s leader partitions, then it is sent right away. If Producers gets less than batch.size but linger.ms interval has passed, then records for that partition are sent. Increase linger.ms to improve the throughput of Brokers and reduce broker load (common improvement).

Kafka Producer: Compressing Batches and End to End compression

The producer config property compression.type defaults to none. Setting this allows the producer to compresses request data. By default, the producer does not compress request data. This setting is set to none, gzip, snappy, or lz4. The compression is by batch and improves with larger batch sizes. End to end compression is possible if the Kafka Broker config “compression.type” set to “producer”. The compressed data can be sent from a producer, then written to the topic log and forwarded to a consumer by broker using the same compressed format. End to end compression is efficient as compression only happens once and is reused by the broker and consumer. End to end compression takes the load off of the broker.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: Batching and Compression Example

public class StockPriceKafkaProducer {

    private static Producer<String, StockPrice>
                                    createProducer() {
        final Properties props = new Properties();
        setupBootstrapAndSerializers(props);
        setupBatchingAndCompression(props);
        ...
        return new KafkaProducer<>(props);
    }

    private static void setupBatchingAndCompression(
            final Properties props) {
        //Linger up to 100 ms before sending batch if size not met
        props.put(ProducerConfig.LINGER_MS_CONFIG, 100);

        //Batch up to 64K buffer sizes.
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,  16_384 * 4);

        //Use Snappy compression for batch compression.
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    }
}

The above sets the Producer config ProducerConfig.LINGER_MS_CONFIG (“linger.ms”) config property to to 100 ms, ProducerConfig.BATCH_SIZE_CONFIG (“batch.size”) config property to 64K bytes, and the ProducerConfig.COMPRESSION_TYPE_CONFIG (“compression.type”) config property to snappy.

This means that after 100ms if 64K bytes of records (per partition) is not added to the output buffer then what is there will be compressed and sent to the Kafka broker.

Review: Buffering, batching and compression

  • How can you increase throughput of Kafka?
  • How can LINGER_MS_CONFIG to increase throughput?
  • What is end to end batching and compression and how do you enable it?

  • How can you increase throughput of Kafka? You can use batching to reduce the amount of IO.

  • How can LINGER_MS_CONFIG to increase throughput? The linger allows batching to grow larger and takes some load off the brokers.

  • What is end to end batching and compression and how do you enable it? End to end batching is possible if you use producer compression at the broker. The producer then becomes responsible for the compression taking the load of the broker. The records get written to the disk compressed and sent to the consumer compressed.

Custom Serializers

You don’t have to use built-in Kafka serializers. You can write your own. You just need to be able to convert your custom keys and values using the serializer convert to and convert from byte arrays (byte[]). Serializers work for keys and values, and you set them up with the Kafka Producer properties value.serializer, and key.serializer.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: Custom Serializers config

public class StockPriceKafkaProducer {

    private static Producer<String, StockPrice>
                                    createProducer() {
        final Properties props = new Properties();
        setupBootstrapAndSerializers(props);
        ...
        return new KafkaProducer<>(props);
    }

    private static void setupBootstrapAndSerializers(Properties props) {
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                StockAppConstants.BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "StockPriceKafkaProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());


        //Custom Serializer - config "value.serializer"
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StockPriceSerializer.class.getName());

    }

}

The above sets the value.serializer producer config property.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceSerializer.java

Kafka Producer: Custom Serializer

package com.cloudurable.kafka.producer;
import com.cloudurable.kafka.producer.model.StockPrice;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class StockPriceSerializer implements Serializer<StockPrice> {

    @Override
    public byte[] serialize(String topic, StockPrice data) {
        return data.toJson().getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public void close() {
    }
}

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/model/StockPrice.java

Kafka Producer: Custom Domain Object for Stocks

package com.cloudurable.kafka.producer.model;

import io.advantageous.boon.json.JsonFactory;

public class StockPrice {

    private final int dollars;
    private final int cents;
    private final String name;
    ...
    public String toJson() {
        return "{" +
                "\"dollars\": " + dollars +
                ", \"cents\": " + cents +
                ", \"name\": \"" + name + '\"' +
                '}';
    }
}

The StockPrice domain object has a toJson method that is used to convert a stock price to a JSON string.

Broker Follower Write Timeout

The Producer config property request.timeout.ms default 30 seconds (30,000 ms). This config property sets the maximum time that the broker waits for confirmation from followers to meet Producer acknowledgment requirements for ack=all. It is a measure of broker to broker latency of the request. Note that 30 seconds is high, and a long process time is indicative of problems with the Kafka cluster or broker nodes.

Producer Retries

The Producer config property retries defaults to 0 and is the retry count if Producer does not get an ack from Kafka Broker. The Producer will only retry if record send fail is deemed a transient error (API). The Producer will act as if your producer code resent the record on a failed attempt. Note that timeouts are re-tried, but retry.backoff.ms (default to 100 ms) is used to wait after failure before retrying the request again. If you set retry > 0, then you should also set max.in.flight.requests.per.connection to 1, or there is the possibility that a re-tried message could be delivered out of order. You have to decide if out of order message delivery matters for your application.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: Custom Serializers config

public class StockPriceKafkaProducer {

    private static Producer<String, StockPrice>
                                    createProducer() {
        final Properties props = new Properties();
        setupRetriesInFlightTimeout(props);
        ...
        return new KafkaProducer<>(props);
    }

    private static void setupRetriesInFlightTimeout(Properties props) {
        //Only one in-flight messages per Kafka broker connection
        // - max.in.flight.requests.per.connection (default 5)
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
                1);
        //Set the number of retries - retries
        props.put(ProducerConfig.RETRIES_CONFIG, 3);

        //Request timeout - request.timeout.ms
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 15_000);

        //Only retry after one second.
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1_000);
    }

}

The above code sets the retires to 3, and the maximum in-flight requests to 1 to avoid out of order retries. It sets the request timeout to 15 seconds and the retry back-off to 1 second.

Review: Kafka Retries

  • How do you configure a custom producer serializer?
  • If you start getting request timeouts and you increase the request.timeout.ms why might this not be a good idea?
  • Why should you set max.in.flight.requests.per.connection if you use a retry count greater than 0?

  • How do you configure a custom producer serializer? You just need to be able to convert your custom keys and values using the serializer convert to and convert from byte arrays (byte[]). Serializers work for keys and values, and you set them up with the Kafka Producer properties value.serializer, and key.serializer.

  • If you start getting request timeouts and you increase the request.timeout.ms why might this not be a good idea? Getting timeouts is an indication that the broker is under too much load. Adding longer timeouts could mitigating things in the short term, but very likely it could put even more load on the brokers which could make things quite a bit worse.

  • Why should you set max.in.flight.requests.per.connection if you use a retry count greater than 0?

Recall, if you set retry > 0, then you should also set max.in.flight.requests.per.connection to 1, or there is the possibility that a re-tried message could be delivered out of order. You have to decide if out of order message delivery matters for your application.

Producer Partitioning

The Producer config property partitioner.class sets the partitioner. By default partitioner.class is set to org.apache.kafka.clients.producer.internals.DefaultPartitioner. The default partitioner partitions using the hash of the record key if the record has a key. The default partitioner partitions using round-robin if the record has no key. A Partitioner class implements Partitioner interface. You can define a custom partitioner. Recall that a topic is divided into partitions and each partition has a Kafka Broker that is the partition leader. Only the partition leader accepts send requests from a Producer.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: Configure Producer Partitioning

public class StockPriceKafkaProducer {

    private static Producer<String, StockPrice>
                                    createProducer() {
        final Properties props = new Properties();
        ...
        props.put("importantStocks", "IBM,UBER");

        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
                StockPricePartitioner.class.getName());

        return new KafkaProducer<>(props);
    }

}

The above sets the partitioner.class to StockPricePartitioner which is a custom partitioner that we defined to send importantStocks to a special partition.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPricePartitioner.java

Kafka Producer: Custom Producer Partitioner

package com.cloudurable.kafka.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.*;

public class StockPricePartitioner implements Partitioner{

    private final Set<String> importantStocks;
    public StockPricePartitioner() {
        importantStocks = new HashSet<>();
    }

    @Override
    public int partition(final String topic,
                         final Object objectKey,
                         final byte[] keyBytes,
                         final Object value,
                         final byte[] valueBytes,
                         final Cluster cluster) {

        final List<PartitionInfo> partitionInfoList =
                cluster.availablePartitionsForTopic(topic);
        final int partitionCount = partitionInfoList.size();
        final int importantPartition = partitionCount -1;
        final int normalPartitionCount = partitionCount -1;

        final String key = ((String) objectKey);

        if (importantStocks.contains(key)) {
            return importantPartition;
        } else {
            return Math.abs(key.hashCode()) % normalPartitionCount;
        }

    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
        final String importantStocksStr = (String) configs.get("importantStocks");
        Arrays.stream(importantStocksStr.split(","))
                .forEach(importantStocks::add);
    }

}

The StockPricePartitioner parses importantStocks config setting and keeps a hash set of importantStocks. It checks the importantStocks hash set to determine if the stock price is important and if so sends the stock price to the importantPartition otherwise sends the stock price to one of the normal partitions (abs(key.hashCode()) % normalPartitionCount) based on the hash of the record key.

Producer Interceptor Config

The Producer config property interceptor.classes is empty but you can pass an comma delimited list of interceptors. The interceptors implement the ProducerInterceptor interface. A producer interceptor intercepts records a producer sends to broker and intercepts after acks are sent. You could mutate records with an interceptor.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: Configure Producer Partitioning

public class StockPriceKafkaProducer {

    private static Producer<String, StockPrice>
                                    createProducer() {
        final Properties props = new Properties();
        ...

        //Install interceptor list - config "interceptor.classes"
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                        StockProducerInterceptor.class.getName());
        ...
        return new KafkaProducer<>(props);
    }

}

The above sets the interceptor.classes to StockProducerInterceptor which is an example interceptor that we defined.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockProducerInterceptor.java

Kafka Producer: Producer Interceptor

package com.cloudurable.kafka.producer;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class StockProducerInterceptor implements ProducerInterceptor {

    private final Logger logger = LoggerFactory
            .getLogger(StockProducerInterceptor.class);
    private int onSendCount;
    private int onAckCount;

    @Override
    public ProducerRecord onSend(final ProducerRecord record) {
        onSendCount++;
        if (logger.isDebugEnabled()) {
            logger.debug(String.format("onSend topic=%s key=%s value=%s %d \n",
                    record.topic(), record.key(), record.value().toString(),
                    record.partition()
            ));
        } else {
            if (onSendCount % 100 == 0) {
                logger.info(String.format("onSend topic=%s key=%s value=%s %d \n",
                        record.topic(), record.key(), record.value().toString(),
                        record.partition()
                ));
            }
        }
        return record;
    }

    @Override
    public void onAcknowledgement(final RecordMetadata metadata,
                                  final Exception exception) {
        onAckCount++;

        if (logger.isDebugEnabled()) {
            logger.debug(String.format("onAck topic=%s, part=%d, offset=%d\n",
                    metadata.topic(), metadata.partition(), metadata.offset()
            ));
        } else {
            if (onAckCount % 100 == 0) {
                logger.info(String.format("onAck topic=%s, part=%d, offset=%d\n",
                        metadata.topic(), metadata.partition(), metadata.offset()
                ));
            }
        }
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

The StockProducerInterceptor is a simple example that overrides onSend and onAcknowledgement to output some logging information.

KafkaProducer send() Method

There are two forms of send method. There is the send method with a callback and one without a callback. Both forms of the send method return a Future. They both asynchronously sends a record to a topic. The callback gets invoked when the broker has acknowledged the send. The send method is asynchronous and returns right away as soon as the record gets added to the send buffer. The send method allows sending many records at once without blocking for a response from Kafka broker. The result of send method is a RecordMetadata which contains a record’s partition, offset, and timestamp. Callbacks for records sent to the same partition are executed in the order sent.

KafkaProducer send() Exceptions

The Kafka Producer send method can emit the following exceptions:

  • InterruptException - If the thread is interrupted while blocked (API).
  • SerializationException - If key or value are not valid objects given configured serializers (API).
  • TimeoutException - If the time taken for fetching metadata or allocating memory exceeds max.block.ms, or getting acks from Broker exceed timeout.ms, etc. (API)
  • KafkaException - If Kafka error occurs not in public API. (API)

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockSender.java

Kafka Producer: Producer StockSender

public class StockSender implements Runnable{
  ...


  try {
    final Future<RecordMetadata> future = producer.send(record);
    if (sentCount % 100 == 0) {displayRecordMetaData(record, future);}
  } catch (InterruptedException e) {
    if (Thread.interrupted()) {
        break;
    }
  } catch (ExecutionException e) {
    logger.error("problem sending record to producer", e);
  }


  ...
  ...
  private void displayRecordMetaData(final ProducerRecord<String, StockPrice> record,
                                   final Future<RecordMetadata> future)
                            throws InterruptedException, ExecutionException {
    final RecordMetadata recordMetadata = future.get();
    logger.info(String.format("\n\t\t\tkey=%s, value=%s " +
                    "\n\t\t\tsent to topic=%s part=%d off=%d at time=%s",
            record.key(),
            record.value().toJson(),
            recordMetadata.topic(),
            recordMetadata.partition(),
            recordMetadata.offset(),
            new Date(recordMetadata.timestamp())
            ));
  }

}

The above shows using the producer send method to send a record.

KafkaProducer flush() method

The Producer flush() method sends all buffered records now (even if linger.ms > 0). It blocks until all requests sent are complete. The flush method is useful when consuming from some input system and pushing data into Kafka as flush() ensures all previously sent messages get sent. This method is useful for marking a completion point after a flush.

KafkaProducer close()

The close() closes producer and frees resources such as connections, threads, and buffers associated with the producer. There are two forms of the close method both block until all previously sent requests complete or a duration passed in as a parameter is exceeded. The close method with no parameters is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS). If a producer is unable to complete all requests before the timeout expires, all unsent requests fail, and this method fails.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: Close Producer

package com.cloudurable.kafka.producer;
...
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

public class StockPriceKafkaProducer {
   ...



    public static void main(String... args) throws Exception {
        //Create Kafka Producer
        final Producer<String, StockPrice> producer = createProducer();
        //Create StockSender list
        final List<StockSender> stockSenders = getStockSenderList(producer);

        //Create a thread pool so every stock sender gets it own.
        // Increase by 1 to fit metrics.
        final ExecutorService executorService = ...;

        //Run each stock sender in its own thread.
        stockSenders.forEach(executorService::submit);


        //Register nice shutdown of thread pool, then flush and close producer.
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            executorService.shutdown();
            try {
                executorService.awaitTermination(200, TimeUnit.MILLISECONDS);
                logger.info("Flushing and closing producer");
                producer.flush();
                producer.close(10_000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.warn("shutting down", e);
            }
        }));
    }

}

Notice the nice orderly shutdown in the runtime addShutdownHook call.

KafkaProducer partitionsFor() method

The partitionsFor(topic) method returns meta data for partitions public List<PartitionInfo> partitionsFor(String topic). This method is used to get partition metadata for a given topic. This method gets used by producers that do their own partitioning - used for custom partitioning. The PartitionInfo consist of a topic, partition,  leader node (Node), replicas nodes (Node[]) and inSyncReplicas nodes. The node consists of id,  host,  port, and rack.

KafkaProducer metrics() method

The metrics() method is used to get a map of metrics: public Map<MetricName,? extends Metric> metrics(). This method is used to get a full set of producer metrics. The MetricName consists of name, group, description, and tags (Map). A metric consist of a Metric name and a value (double).

We created a MetricsProducerReporter which periodically prints out metrics for a Producer. It calls producer.metrics().

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/MetricsProducerReporter.java

Kafka Producer: Producer Metrics

package com.cloudurable.kafka.producer;

import com.cloudurable.kafka.producer.model.StockPrice;
import io.advantageous.boon.core.Sets;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

public class MetricsProducerReporter implements Runnable {
    private final Producer<String, StockPrice> producer;
    private final Logger logger =
            LoggerFactory.getLogger(MetricsProducerReporter.class);

    //Used to Filter just the metrics we want
    private final Set<String> metricsNameFilter = Sets.set(
            "record-queue-time-avg", "record-send-rate", "records-per-request-avg",
            "request-size-max", "network-io-rate", "record-queue-time-avg",
            "incoming-byte-rate", "batch-size-avg", "response-rate", "requests-in-flight"
    );

    public MetricsProducerReporter(
            final Producer<String, StockPrice> producer) {
        this.producer = producer;
    }

    @Override
    public void run() {
        while (true) {
            final Map<MetricName, ? extends Metric> metrics
                    = producer.metrics();

            displayMetrics(metrics);
            try {
                Thread.sleep(3_000);
            } catch (InterruptedException e) {
                logger.warn("metrics interrupted");
                Thread.interrupted();
                break;
            }
        }
    }


    static class MetricPair {
        private final MetricName metricName;
        private final Metric metric;
        MetricPair(MetricName metricName, Metric metric) {
            this.metricName = metricName;
            this.metric = metric;
        }
        public String toString() {
            return metricName.group() + "." + metricName.name();
        }
    }

    private void displayMetrics(Map<MetricName, ? extends Metric> metrics) {
        final Map<String, MetricPair> metricsDisplayMap = metrics.entrySet().stream()
                //Filter out metrics not in metricsNameFilter
                .filter(metricNameEntry ->
                        metricsNameFilter.contains(metricNameEntry.getKey().name()))
                //Filter out metrics not in metricsNameFilter
                .filter(metricNameEntry ->
                        !Double.isInfinite(metricNameEntry.getValue().value()) &&
                                !Double.isNaN(metricNameEntry.getValue().value()) &&
                                metricNameEntry.getValue().value() != 0
                )
                //Turn Map<MetricName,Metric> into TreeMap<String, MetricPair>
                .map(entry -> new MetricPair(entry.getKey(), entry.getValue()))
                .collect(Collectors.toMap(
                        MetricPair::toString, it -> it, (a, b) -> a, TreeMap::new
                ));


        //Output metrics
        final StringBuilder builder = new StringBuilder(255);
        builder.append("\n---------------------------------------\n");
        metricsDisplayMap.entrySet().forEach(entry -> {
            MetricPair metricPair = entry.getValue();
            String name = entry.getKey();
            builder.append(String.format(Locale.US, "%50s%25s\t\t%,-10.2f\t\t%s\n",
                    name,
                    metricPair.metricName.name(),
                    metricPair.metric.value(),
                    metricPair.metricName.description()
            ));
        });
        builder.append("\n---------------------------------------\n");
        logger.info(builder.toString());
    }


}

The MetricsProducerReporter would output the following metrics.

Output for MetricsProducerReporter

Metric	 producer-metrics,	 record-queue-time-max,	 508.0,
		The maximum time in ms record batches spent in the record accumulator.

17:09:22.721 [pool-1-thread-9] INFO  c.c.k.p.MetricsProducerReporter -
Metric	 producer-node-metrics,	 request-rate,	 0.025031289111389236,
		The average number of requests sent per second.

17:09:22.721 [pool-1-thread-9] INFO  c.c.k.p.MetricsProducerReporter -
Metric	 producer-metrics,	 records-per-request-avg,	 205.55263157894737,
		The average number of records per request.		

17:09:22.722 [pool-1-thread-9] INFO  c.c.k.p.MetricsProducerReporter -
Metric	 producer-metrics,	 record-size-avg,	 71.02631578947368,
		The average record size

17:09:22.722 [pool-1-thread-9] INFO  c.c.k.p.MetricsProducerReporter -
Metric	 producer-node-metrics,	 request-size-max,	 56.0,
		The maximum size of any request sent in the window.

17:09:22.723 [pool-1-thread-9] INFO  c.c.k.p.MetricsProducerReporter -
Metric	 producer-metrics,	 request-size-max,	 12058.0,
		The maximum size of any request sent in the window.

17:09:22.723 [pool-1-thread-9] INFO  c.c.k.p.MetricsProducerReporter -
Metric	 producer-metrics,	 compression-rate-avg,	 0.41441360272859273,
		The average compression rate of record batches.

Review: Metrics, orderly shutdown, producer partitioner

  • How can you access Metrics for the Kafka Producer?
  • How can you shut down a producer orderly?
  • When might you use an producer interceptor?
  • When might you use a producer partitioner?

  • How can you access Metrics for the Kafka Producer? Call metrics() on the producer.

  • How can you shut down a producer orderly? Call close.

  • When might you use an producer interceptor? If you need to modify the record before sending it. (Add extra headers, etc.)

  • When might you use a producer partitioner? If you want to implement some sort of custom partitioning like a priority queue.

Lab Creating an advanced Kafka Producer

Stock Price Producer

The Stock Price Producer example has the following classes:

  • StockPrice - holds a stock price has a name, dollar, and cents
  • StockPriceKafkaProducer - Configures and creates KafkaProducer, StockSender list, ThreadPool (ExecutorService), starts StockSender runnable into thread pool
  • StockAppConstants - holds topic and broker list
  • StockPriceSerializer - can serialize a StockPrice into byte[]
  • StockSender - generates somewhat random stock prices for a given StockPrice name, Runnable, 1 thread per StockSender and shows using KafkaProducer from many threads

StockPrice

The StockPrice is a simple domain object that holds a stock price has a name, dollar, and cents. The StockPrice knows how to convert itself into a JSON string.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/model/StockPrice.java

Kafka Producer: StockPrice

package com.cloudurable.kafka.producer.model;

import io.advantageous.boon.json.JsonFactory;

public class StockPrice {

    private final int dollars;
    private final int cents;
    private final String name;

    public StockPrice(final String json) {
        this(JsonFactory.fromJson(json, StockPrice.class));
    }


    public StockPrice() {
        dollars = 0;
        cents = 0;
        name = "";
    }

    public StockPrice(final String name, final int dollars, final int cents) {
        this.dollars = dollars;
        this.cents = cents;
        this.name = name;
    }


    public StockPrice(final StockPrice stockPrice) {
        this.cents = stockPrice.cents;
        this.dollars = stockPrice.dollars;
        this.name = stockPrice.name;
    }


    public int getDollars() {
        return dollars;
    }


    public int getCents() {
        return cents;
    }


    public String getName() {
        return name;
    }


    @Override
    public String toString() {
        return "StockPrice{" +
                "dollars=" + dollars +
                ", cents=" + cents +
                ", name='" + name + '\'' +
                '}';
    }


    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        StockPrice that = (StockPrice) o;

        if (dollars != that.dollars) return false;
        if (cents != that.cents) return false;
        return name != null ? name.equals(that.name) : that.name == null;
    }

    @Override
    public int hashCode() {
        int result = dollars;
        result = 31 * result + cents;
        result = 31 * result + (name != null ? name.hashCode() : 0);
        return result;
    }


    public String toJson() {
        return "{" +
                "\"dollars\": " + dollars +
                ", \"cents\": " + cents +
                ", \"name\": \"" + name + '\"' +
                '}';
    }
}

StockPrice is just a POJO.

StockPriceKafkaProducer

StockPriceKafkaProducer import classes and sets up a logger. It has a createProducer method to create a KafkaProducer instance. It has a setupBootstrapAndSerializers to initialize bootstrap servers, client id, key serializer and custom serializer (StockPriceSerializer). It has a main() method that creates the producer, creates a StockSender list passing each instance the producer, and it creates a thread pool, so every stock sender gets it own thread, and then it runs each stockSender in its own thread using the thread pool.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: StockPriceKafkaProducer imports, createProducer

package com.cloudurable.kafka.producer;

import com.cloudurable.kafka.StockAppConstants;
import com.cloudurable.kafka.producer.model.StockPrice;
import io.advantageous.boon.core.Lists;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class StockPriceKafkaProducer {

    private static Producer<String, StockPrice>
                                    createProducer() {
        final Properties props = new Properties();
        setupBootstrapAndSerializers(props);
        return new KafkaProducer<>(props);
    }
    ...
}

The above code imports Kafka classes and sets up the logger and calls createProducer to create a KafkaProducer. The createProducer() calls setupBoostrapAndSerializers().

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: StockPriceKafkaProducer imports, createProducer


public class StockPriceKafkaProducer {
  private static void setupBootstrapAndSerializers(Properties props) {
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            StockAppConstants.BOOTSTRAP_SERVERS);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "StockPriceKafkaProducer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());


    //Custom Serializer - config "value.serializer"
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StockPriceSerializer.class.getName());

  }
}

The setupBootstrapAndSerializers method initializes bootstrap servers, client id, key serializer and custom serializer (StockPriceSerializer). The StockPriceSerializer will serialize StockPrice into bytes.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: StockPriceKafkaProducer.main - start thread pool


public class StockPriceKafkaProducer {
  ...

  public static void main(String... args) throws Exception {
    //Create Kafka Producer
    final Producer<String, StockPrice> producer = createProducer();
    //Create StockSender list
    final List<StockSender> stockSenders = getStockSenderList(producer);

    //Create a thread pool so every stock sender gets it own.
    // Increase by 1 to fit metrics.
    final ExecutorService executorService =
            Executors.newFixedThreadPool(stockSenders.size() + 1);

    //Run Metrics Producer Reporter which is runnable passing it the producer.
    executorService.submit(new MetricsProducerReporter(producer));

    //Run each stock sender in its own thread.
    stockSenders.forEach(executorService::submit);

  }
  ...
}

The StockPriceKafkaProducer main method creates a Kafka producer, then creates StockSender list passing each instance the producer. It then creates a thread pool (executorService) and runs each StockSender, which is runnable, in its own thread from the thread pool.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: StockPriceKafkaProducer.getStockSenderList - create list of StockSenders


    private static List<StockSender> getStockSenderList(
            final Producer<String, StockPrice> producer) {
        return Lists.list(
                new StockSender(StockAppConstants.TOPIC,
                        new StockPrice("IBM", 100, 99),
                        new StockPrice("IBM", 50, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("SUN", 100, 99),
                        new StockPrice("SUN", 50, 10),
                        producer,
                        1, 10
                ),
                ...,
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("FFF", 100, 99),
                        new StockPrice("FFF", 50, 10),
                        producer,
                        1, 10
                )
        );

    }

The getStockSenderList of StockPriceKafkaProducer just creates a list of StockSenders.

StockPriceSerializer

The StockPriceSerializer converts a StockPrice into a byte array.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceSerializer.java

Kafka Producer: StockPriceSerializer - convert StockPrice into a byte array

package com.cloudurable.kafka.producer;
import com.cloudurable.kafka.producer.model.StockPrice;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class StockPriceSerializer implements Serializer<StockPrice> {

    @Override
    public byte[] serialize(String topic, StockPrice data) {
        return data.toJson().getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public void close() {
    }
}

Notice the StockPriceSerializer converts a StockPrice into a byte array by calling StockPrice.toJson.

StockAppConstants

The StockAppConstants defines a few constants, namely, topic name and a comma delimited list of bootstrap Kafka brokers.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/StockAppConstants.java

Kafka Producer: StockAppConstants defines constants

package com.cloudurable.kafka;

public class StockAppConstants {
    public final static String TOPIC = "stock-prices";
    public final static String BOOTSTRAP_SERVERS =
            "localhost:9092,localhost:9093,localhost:9094";

}

StockSender

The StockSender uses the Kafka Producer we created earlier. The StockSender generates random stock prices for a given StockPrice name. The StockSender is Runnable and runs in its own thread. There is one thread per StockSender. The StockSender is used to show using KafkaProducer from many threads. The StockSender Delays random time between delayMin and delayMax, then sends a random StockPrice between stockPriceHigh and stockPriceLow.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockSender.java

Kafka Producer: StockSender imports, Runnable

package com.cloudurable.kafka.producer;

import com.cloudurable.kafka.producer.model.StockPrice;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class StockSender implements Runnable{
  ...
}

The StockSender imports Kafka Producer, ProducerRecord, RecordMetadata, and StockPrice. It implements Runnable, and can be submitted to ExecutionService (thread pool).

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockSender.java

Kafka Producer: StockSender fields

public class StockSender implements Runnable{

      private final StockPrice stockPriceHigh;
      private final StockPrice stockPriceLow;
      private final Producer<String, StockPrice> producer;
      private final int delayMinMs;
      private final int delayMaxMs;
      private final Logger logger = LoggerFactory.getLogger(StockSender.class);
      private final String topic;

      public StockSender(final String topic, final StockPrice stockPriceHigh,
                         final StockPrice stockPriceLow,
                         final Producer<String, StockPrice> producer,
                         final int delayMinMs,
                         final int delayMaxMs) {
          this.stockPriceHigh = stockPriceHigh;
          this.stockPriceLow = stockPriceLow;
          this.producer = producer;
          this.delayMinMs = delayMinMs;
          this.delayMaxMs = delayMaxMs;
          this.topic = topic;
      }
}

The StockSender takes a topic, high & low stockPrice, producer, and delay min & max.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockSender.java

Kafka Producer: StockSender run method

public class StockSender implements Runnable{

  ...
  public void run() {
    final Random random = new Random(System.currentTimeMillis());
    int sentCount = 0;

    while (true) {
        sentCount++;
        final ProducerRecord <String, StockPrice> record =
                                    createRandomRecord(random);
        final int delay = randomIntBetween(random, delayMaxMs, delayMinMs);

        try {
            final Future<RecordMetadata> future = producer.send(record);
            if (sentCount % 100 == 0) {displayRecordMetaData(record, future);}
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            if (Thread.interrupted()) {
                break;
            }
        } catch (ExecutionException e) {
            logger.error("problem sending record to producer", e);
        }
    }
  }
}

The StockSender run methods in a forever loop creates random record, sends the record, waits random time, and then repeats.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockSender.java

Kafka Producer: StockSender createRandomRecord

public class StockSender implements Runnable{

  ...
  private final int randomIntBetween(final Random random,
                                   final int max,
                                   final int min) {
    return random.nextInt(max - min + 1) + min;
  }

  private ProducerRecord<String, StockPrice> createRandomRecord(
            final Random random) {

    final int dollarAmount = randomIntBetween(random,
            stockPriceHigh.getDollars(), stockPriceLow.getDollars());

    final int centAmount = randomIntBetween(random,
            stockPriceHigh.getCents(), stockPriceLow.getCents());

    final StockPrice stockPrice = new StockPrice(
            stockPriceHigh.getName(), dollarAmount, centAmount);

    return new ProducerRecord<>(topic, stockPrice.getName(),
            stockPrice);
  }
}

The StockSender createRandomRecord method uses randomIntBetween. The createRandomRecord creates StockPrice and then wraps StockPrice in ProducerRecord.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockSender.java

Kafka Producer: StockSender displayRecordMetaData

public class StockSender implements Runnable{

  ...
  private void displayRecordMetaData(final ProducerRecord<String, StockPrice> record,
                                   final Future<RecordMetadata> future)
                            throws InterruptedException, ExecutionException {
    final RecordMetadata recordMetadata = future.get();
    logger.info(String.format("\n\t\t\tkey=%s, value=%s " +
                    "\n\t\t\tsent to topic=%s part=%d off=%d at time=%s",
            record.key(),
            record.value().toJson(),
            recordMetadata.topic(),
            recordMetadata.partition(),
            recordMetadata.offset(),
            new Date(recordMetadata.timestamp())
            ));
  }
  ...
}

Every 100 records StockSender displayRecordMetaData method gets called, which prints out record info, and recordMetadata info: key, JSON value, topic, partition, offset, time. The displayRecordMetaData uses the Future from the call to producer.send().

Running the example

To run the example, you need to run ZooKeeper, then run the three Kafka Brokers. Once that is running, you will need to run create-topic.sh. And lastly run the StockPriceKafkaProducer from the IDE.

First run ZooKeeper.

Running ZooKeeper with run-zookeeper.sh (Run in a new terminal)

~/kafka-training

$ cat run-zookeeper.sh
#!/usr/bin/env bash
cd ~/kafka-training

kafka/bin/zookeeper-server-start.sh \
   kafka/config/zookeeper.properties

$ ./run-zookeeper.sh   

Now run the first Kafka Broker.

Running the 1st Kafka Broker (Run in a new terminal)

~/kafka-training/lab5

$ cat bin/start-1st-server.sh
#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
    "$CONFIG/server-0.properties"

$ bin/start-1st-server.sh

Now run the second Kafka Broker.

Running the 2nd Kafka Broker (Run in a new terminal)

~/kafka-training/lab5

$ cat bin/start-2nd-server.sh
#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
    "$CONFIG/server-1.properties"

$ bin/start-2nd-server.sh

Now run the third Kafka Broker.

Running the 3rd Kafka Broker (Run in a new terminal)

~/kafka-training/lab5

$ cat bin/start-3rd-server.sh
#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
    "$CONFIG/server-2.properties"

$ bin/start-3rd-server.sh

Once all brokers are running, run create-topic.sh as follows.

Running create topic

~/kafka-training/lab5

$ cat bin/create-topic.sh
#!/usr/bin/env bash

cd ~/kafka-training

kafka/bin/kafka-topics.sh \
    --create \
    --zookeeper localhost:2181 \
    --replication-factor 3 \
    --partitions 3 \
    --topic stock-prices \
    --config min.insync.replicas=2

$ bin/create-topic.sh
    Created topic "stock-prices".

The create-topics script creates a topic. The name of the topic is stock-prices. The topic has three partitions. The created topic has a replication factor of three.

For the config only the broker id and log directory changes.

config/server-0.properties

broker.id=0
port=9092
log.dirs=./logs/kafka-0
...

Run the StockPriceKafkaProducer from your IDE. You should see log messages from StockSender(s) with StockPrice name, JSON value, partition, offset, and time.

Lab Adding an orderly shutdown flush and close

Shutdown Producer Nicely

Let’s write some code to shut the Producer down nicely.

The shutdown code will happen if you are running the producer example from a terminal and type ctrl-C so shutdown from Java occurs. We will write some code to shutdown thread pool and wait. Then we will flush the producer to send any outstanding batches if using batches (producer.flush()). Lastly, we will close the producer using producer.close and wait five seconds for the producer to shutdown. (Note that closing the producer also flushes it.)

To this we will add shutdown hook to Java runtime. Then to test we will start the StockPriceKafkaProducer, and then you can stop it using CTRL-C or by pressing the stop button in your IDE.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: StockPriceKafkaProducer shutdown hook for clean shutdown

public class StockPriceKafkaProducer {

    ...

    private static final Logger logger = LoggerFactory.getLogger(StockPriceKafkaProducer.class);



    public static void main(String... args) throws Exception {
        //Create Kafka Producer
        final Producer<String, StockPrice> producer = createProducer();
        //Create StockSender list
        final List<StockSender> stockSenders = getStockSenderList(producer);

        //Create a thread pool so every stock sender gets it own.
        // Increase by 1 to fit metrics.
        final ExecutorService executorService =
                Executors.newFixedThreadPool(stockSenders.size() );

        //Run each stock sender in its own thread.
        stockSenders.forEach(executorService::submit);


        //Register nice shutdown of thread pool, then flush and close producer.
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            executorService.shutdown();
            try {
                executorService.awaitTermination(200, TimeUnit.MILLISECONDS);
                logger.info("Flushing and closing producer");
                producer.flush();
                producer.close(10_000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.warn("shutting down", e);
            }
        }));
    }

...
}

Notice we add a shutdown hook using Runtime.getRuntime().addShutdownHook and this shutdown hook that shuts down the thread pool, then calls flush on the producer and then closes the producer whilst waiting 10 seconds for the close to happen.

Lab Configuring Producer Durability

In this lab we will configure producer durability.

Set default acks to all

Change StockPriceKafkaProducer and set Producer config acks to all (this is the default). This means that all ISRs in-sync replicas have to respond for producer write to go through.

public class StockPriceKafkaProducer {

    private static Producer<String, StockPrice>
                                    createProducer() {
        final Properties props = new Properties();
        ...

        //Set number of acknowledgments - acks - default is all
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        return new KafkaProducer<>(props);
    }
}

Notice we set the Producer config property “acks” (ProducerConfig.ACKS_CONFIG) to all.

Kafka Broker Config for Min.Insync.Replicas

Next let’s set the the min.insync.replicas to three. This will force at least three in-sync replicas (ISRs) have to respond for our producer to get an ack from the Kafka Broker. NOTE: We have three brokers in this lab, thus all three have to be up for the Producer to work.

Ensure min.insync.replicas is set to three in all of the broker config files (server-0.properties, server-1.properties and server-2.properties).

Run this lab. Run it. Run Servers. Run Producer. Kill 1 Broker.

If not already, startup ZooKeeper. Now startup three Kafka brokers (or ensure they are running) using scripts described earlier. From the IDE run StockPriceKafkaProducer class. From the terminal kill one of the Kafka Brokers. Now look at the logs for the StockPriceKafkaProducer, you should see Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException. Note that the Messages are rejected since there are fewer in-sync replicas than required. Repeat this with only 2 min.insync.replicas set. (Change config and restart brokers and restart producer). Observe the behavior of using 2 for min.insync.replicas vs. three.

Why did the send fail?

The producer config ProducerConfig.ACKS_CONFIG (acks config for producer) was set to “all”. This settings expects leader to only give successful ack after all followers ack the send was written to their log. The Broker config min.insync.replicas set to 3. At least three in-sync replicas must respond before send is considered successful. Since we took one broker out and only had three to begin with, it forces the send to fail since the send can not get three acks from ISRs.

Modify durability to leader only

Change StockPriceKafkaProducer acks config to 1 props.put(ProducerConfig.ACKS_CONFIG, “1", i.e., leader sends ack after write to log. From the IDE run StockPriceKafkaProducer again. From the terminal kill one of the Kafka Brokers. Notice that the StockPriceKafkaProducer now runs normally.

Why did the send not fail for acks 1?

Setting the Producer config ProducerConfig.ACKS_CONFIG (acks config for producer) to “1”. This setting expects leader to only give successful ack after it writes to its log. Replicas still get replication but leader does not wait for replication to send ack. Broker Config min.insync.replicas is still set to 3, but this config only gets looked at if acks=“all”.

Running describe topics before and after stoping broker

Try the last steps again. Stop a server while producer is running. Then run describe-topics.sh. Then Rerun server you stopped and run describe-topics.sh again. Observe the changes to ISRs and partition to broker ownership.

bin/describe-topics.sh

#!/usr/bin/env bash

cd ~/kafka-training

# List existing topics
kafka/bin/kafka-topics.sh --describe \
    --topic stock-prices \
    --zookeeper localhost:2181

The script bin/describe-topics.sh calls kafka-topics.sh to describe the topic layout with regards to brokers and partitions.

Retry with acks = 0

Run the last example again (servers, and producer), but this time set acks to 0. Run all three brokers then take one away. Then take another broker away. Try Run describe-topics. Take all of the brokers down and continue to run the producer. What do you think happens? When you are done, change acks back to acks=all.

Lab Review

Look at the following listing.

Describe topic listing

$ bin/describe-topics.sh
Topic:stock-prices	PartitionCount:3	ReplicationFactor:3	Configs:min.insync.replicas=2
	Topic: stock-prices	Partition: 0	Leader: 2	Replicas: 1,2,0	Isr: 2
	Topic: stock-prices	Partition: 1	Leader: 2	Replicas: 2,0,1	Isr: 2
	Topic: stock-prices	Partition: 2	Leader: 2	Replicas: 0,1,2	Isr: 2
  • How would you describe the above?
  • How many servers are likely running out of the three?
  • Would the producer still run with acks=all? Why or Why not?
  • Would the producer still run with acks=1? Why or Why not?
  • Would the producer still run with acks=0? Why or Why not?
  • Which broker is the leader of partition 1?

  • How would you describe the above? Two servers are down. Broker 0 and Broker 1.

  • How many servers are likely running out of the three? Just the third broker.

  • Would the producer still run with acks=all? Why or Why not? No. Only one server is running.

  • Would the producer still run with acks=1? Why or Why not? No. Yes. The 3rd server owns the partitions.

  • Would the producer still run with acks=0? Why or Why not? Yes.

  • Which broker is the leader of partition 1? The third server one with broker id 2.

Lab Adding Producer Metrics and Replication Verification

The objectives of this lab is to setup Kafka producer metrics and use the replication verification command line tool.

To do this you will change the min.insync.replicas for broker and observer metrics and replication verification and then change min.insync.replicas for topic and observer metrics and replication verification.

Create Producer Metrics Monitor

As part of this lab we will create a class called MetricsProducerReporter that is Runnable. The MetricsProducerReporter gets passed a Kafka Producer, and the MetricsProducerReporter calls the producer.metrics() method every 10 seconds in a while loop from run method, and prints out the MetricName and Metric value. The StockPriceKafkaProducer main method submits MetricsProducerReporter to the ExecutorService (thread pool).

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/MetricsProducerReporter.java

Kafka Producer: MetricsProducerReporter for reporting metrics is Runnable

package com.cloudurable.kafka.producer;

import com.cloudurable.kafka.producer.model.StockPrice;
import io.advantageous.boon.core.Sets;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

public class MetricsProducerReporter implements Runnable {
    private final Producer<String, StockPrice> producer;
    private final Logger logger =
            LoggerFactory.getLogger(MetricsProducerReporter.class);

    //Used to Filter just the metrics we want
    private final Set<String> metricsNameFilter = Sets.set(
            "record-queue-time-avg", "record-send-rate", "records-per-request-avg",
            "request-size-max", "network-io-rate", "record-queue-time-avg",
            "incoming-byte-rate", "batch-size-avg", "response-rate", "requests-in-flight"
    );

    public MetricsProducerReporter(
            final Producer<String, StockPrice> producer) {
        this.producer = producer;
    }
}

Note that the MetricsProducerReporter is Runnable and it will get submitted to the ExecutorService (thread pool). Note that the MetricsProducerReporter gets passed a Kafka Producer, which it will call to report metrics.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/MetricsProducerReporter.java

Kafka Producer: MetricsProducerReporter calls producer.metrics()

package com.cloudurable.kafka.producer;
...

public class MetricsProducerReporter implements Runnable {
  ...
@Override
public void run() {
    while (true) {
        final Map<MetricName, ? extends Metric> metrics
                = producer.metrics();

        displayMetrics(metrics);
        try {
            Thread.sleep(3_000);
        } catch (InterruptedException e) {
            logger.warn("metrics interrupted");
            Thread.interrupted();
            break;
        }
    }
}

private void displayMetrics(Map<MetricName, ? extends Metric> metrics) {
    final Map<String, MetricPair> metricsDisplayMap = metrics.entrySet().stream()
            //Filter out metrics not in metricsNameFilter
            .filter(metricNameEntry ->
                    metricsNameFilter.contains(metricNameEntry.getKey().name()))
            //Filter out metrics not in metricsNameFilter
            .filter(metricNameEntry ->
                    !Double.isInfinite(metricNameEntry.getValue().value()) &&
                            !Double.isNaN(metricNameEntry.getValue().value()) &&
                            metricNameEntry.getValue().value() != 0
            )
            //Turn Map<MetricName,Metric> into TreeMap<String, MetricPair>
            .map(entry -> new MetricPair(entry.getKey(), entry.getValue()))
            .collect(Collectors.toMap(
                    MetricPair::toString, it -> it, (a, b) -> a, TreeMap::new
            ));


    //Output metrics
    final StringBuilder builder = new StringBuilder(255);
    builder.append("\n---------------------------------------\n");
    metricsDisplayMap.entrySet().forEach(entry -> {
        MetricPair metricPair = entry.getValue();
        String name = entry.getKey();
        builder.append(String.format(Locale.US, "%50s%25s\t\t%,-10.2f\t\t%s\n",
                name,
                metricPair.metricName.name(),
                metricPair.metric.value(),
                metricPair.metricName.description()
        ));
    });
    builder.append("\n---------------------------------------\n");
    logger.info(builder.toString());
}
...
}

The run method calls producer.metrics() every 10 seconds in a while loop, and print out MetricName and Metric value. It only prints out the names that are in metricsNameFilter.

Notice we are using Java 8 Stream to filter and sort metrics. We get rid of metric values that are NaN, infinite numbers and 0s. Then we sort map by converting it to TreeMap. The MetricPair is helper class that has a Metric and a MetricName. Then we give it a nice format so we can read metrics easily and use so some space and some easy indicators to find the metrics in the log.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: StockPriceKafkaProducer adds MetricsProducerReporter to executorService

public class StockPriceKafkaProducer {
...

  public static void main(String... args) throws Exception {
      //Create Kafka Producer
      final Producer<String, StockPrice> producer = createProducer();
      //Create StockSender list
      final List<StockSender> stockSenders = getStockSenderList(producer);

      //Create a thread pool so every stock sender gets it own.
      // Increase by 1 to fit metrics.
      final ExecutorService executorService =
              Executors.newFixedThreadPool(stockSenders.size() + 1);

      //Run Metrics Producer Reporter which is runnable passing it the producer.
      executorService.submit(new MetricsProducerReporter(producer));

      //Run each stock sender in its own thread.
      stockSenders.forEach(executorService::submit);
      ...
  }
...
}

The main method of StockPriceKafkaProducer increases thread pool size by 1 to fit metrics reporting. Then submits a new instance of MetricsProducerReporter to the ExecutorService. The new MetricsProducerReporter is passed the producer from createProducer.

Run it. Run Servers. Run Producer.

If not already, startup ZooKeeper as before and then startup the three Kafka brokers using scripts described earlier. Then from the IDE run StockPriceKafkaProducer (ensure acks are set to all first). Observe metrics which print out every ten seconds.

Expected output from MetricsProducerReporter

---------------------------------------
    producer-metrics.batch-size-avg                batch-size-avg		          9,030.78  		The average number of bytes sent per partition per-request.
    producer-metrics.incoming-byte-rate            incoming-byte-rate		      63.28     		Bytes/second read off all sockets
    producer-metrics.network-io-rate               network-io-rate		        1.32      		The average number of network operations (reads or writes) on .
    producer-metrics.record-queue-time-avg         record-queue-time-avg	    3,008.97  		The average time in ms record batches spent in the..
    producer-metrics.record-send-rate              record-send-rate		        485.76    		The average number of records sent per second.
    producer-metrics.records-per-request-avg       records-per-request-avg		737.03    		The average number of records per request.
    producer-metrics.request-size-max              request-size-max		        46,064.00 		The maximum size of any request sent in the window.
    producer-metrics.response-rate                 response-rate		          0.66      		Responses received sent per second.
    producer-node-metrics.incoming-byte-rate       incoming-byte-rate		      127.41    		
    producer-node-metrics.request-size-max         request-size-max		        46,673.00 		The maximum size of any request sent in the window.
    producer-node-metrics.response-rate            response-rate		          1.32      		The average number of responses received per second.
    producer-topic-metrics.record-send-rate        record-send-rate		        960.73    		

---------------------------------------

Use replication verification to observe replicas getting behind

Next we will use replica-verification.sh to show max lag between replicas increase after we stop one of the brokers.

Running replica-verification.sh while killing one broker while running StockPriceKafkaProducer


## In ~/kafka-training/lab5

$ cat bin/replica-verification.sh
#!/usr/bin/env bash

cd ~/kafka-training

# List existing topics
kafka/bin/kafka-replica-verification.sh  \
    --report-interval-ms 5000 \
    --topic-white-list  "stock-prices.*" \
    --broker-list localhost:9092,localhost:9093,localhost:9094

## Run replica-verification to show max lag  
$ bin/replica-verification.sh
2017-07-05 10:50:15,319: verification process is started.
2017-07-05 10:50:21,062: max lag is 0 for partition [stock-prices,0] at offset 29559 among 3 partitions
2017-07-05 10:50:27,479: max lag is 0 for partition [stock-prices,0] at offset 29559 among 3 partitions
2017-07-05 10:50:32,974: max lag is 0 for partition [stock-prices,0] at offset 29559 among 3 partitions
2017-07-05 10:50:39,672: max lag is 1327 for partition [stock-prices,0] at offset 29559 among 3 partitions
2017-07-05 10:50:45,520: max lag is 7358 for partition [stock-prices,0] at offset 29559 among 3 partitions
2017-07-05 10:50:50,871: max lag is 13724 for partition [stock-prices,0] at offset 29559 among 3 partitions
2017-07-05 10:50:57,449: max lag is 19122 for partition [stock-prices,0] at offset 29559 among 3 partitions

While running StockPriceKafkaProducer from the command line, we kill one of the Kafka brokers and watch the max lag increase.

Running replica-verification.sh after killing one broker while running StockPriceKafkaProducer

# In ~/kafka-training/lab5
$ cat bin/describe-topics.sh
#!/usr/bin/env bash

cd ~/kafka-training

# List existing topics
kafka/bin/kafka-topics.sh --describe \
    --topic stock-prices \
    --zookeeper localhost:2181


$ bin/describe-topics.sh
Topic:stock-prices	PartitionCount:3	ReplicationFactor:3	Configs:min.insync.replicas=2
 Topic: stock-prices	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 2,1
 Topic: stock-prices	Partition: 1	Leader: 2	Replicas: 2,0,1	Isr: 2,1
 Topic: stock-prices	Partition: 2	Leader: 1	Replicas: 0,1,2	Isr: 2,1

Notice that one of the servers are down and one brokers owns two partitions.

Change min.insync.replicas

Now stop all Kafka Brokers (Kafka servers). Then change min.insync.replicas=3 to min.insync.replicas=2. Make this change in all of the configuration files for the brokers under the config directory of the lab (config/server-0.properties, config/server-1.properties, and config/server-2.properties). The config files for the brokers are in lab directory under config. After you are done, restart Zookeeper if needed and then restart the servers. Try starting and stopping different Kafka Brokers while StockPriceKafkaProducer is running. Be sure to observe metrics, and observe any changes. Also run replication verification utility in one terminal while checking topics stats in another with describe-topics.sh in another terminal.

Expected output from changing min.insync.replicas

The Producer will work even if one broker goes down. The Producer will not work if two brokers go down because min.insync.replicas=2, thus two replicas have to be up besides leader. Since the Producer can run with 1 down broker, notice that the replication lag can get really far behind.

Once you are done, change it back

Shutdown all brokers, change the all the broker config back to min.insync.replicas=3 (broker config for servers). Restart the brokers.

Change min.insync.replicas at the topic level

Modify bin/create-topic.sh and add add –config min.insync.replicas=2 Add this as param to kafka-topics.sh. Now run bin/delete-topic.sh and then run bin/create-topic.sh.

bin/create-topic.sh after modification

$ cat bin/create-topic.sh
#!/usr/bin/env bash

cd ~/kafka-training

kafka/bin/kafka-topics.sh \
    --create \
    --zookeeper localhost:2181 \
    --replication-factor 3 \
    --partitions 3 \
    --topic stock-prices \
    --config min.insync.replicas=2

Run delete topic and then run create topic as follows.

running delete topic and then create topic.

$ bin/delete-topic.sh
Topic stock-prices is marked for deletion.

$ bin/create-topic.sh
Created topic "stock-prices".

Run it with the new topics.

Now stop all Kafka Brokers (Kafka servers) and startup ZooKeeper if needed and the three Kafka brokers, Run StockPriceKafkaProducer (ensure acks are set to all first). Start and stop different Kafka Brokers while StockPriceKafkaProducer runs as before. Observe metrics, and observe any changes, Also Run replication verification in one terminal and check topics stats in another with describe-topics.sh in another terminal.

Expected results of changing topic to use min.insync.replicas

The min.insync.replicas on the Topic config overrides the min.insync.replicas on the Broker config. In this setup, you can survive a single node failure but not two (output below is recovery).

Lab Batching Records

Objectives is to understand Kafka Batching. You will disable batching and observer metrics, then you will reenable batching and observe metrics. Next you increase batch size and linger and observe metrics. In this lab, we will run a consumer to see batch sizes change from a consumer perspective as we change batching on the producer side. Lastly you will enable compression, and then observe results.

SimpleStockPriceConsumer

We added a SimpleStockPriceConsumer to consume StockPrices and display batch lengths for poll(). We won’t cover the consumer in detail just quickly, since this is a Producer lab not a Consumer lab. You will run this consumer while you are running the StockPriceKafkaProducer. While you are running SimpleStockPriceConsumer with various batch and linger config, observe output of Producer metrics and StockPriceKafkaProducer output.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/consumer/SimpleStockPriceConsumer.java

Kafka Producer: SimpleStockPriceConsumer to consumer records

package com.cloudurable.kafka.consumer;
import com.cloudurable.kafka.StockAppConstants;
import com.cloudurable.kafka.producer.model.StockPrice;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
...

public class SimpleStockPriceConsumer {

    private static Consumer<String, StockPrice> createConsumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                StockAppConstants.BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                "KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        //Custom Deserializer
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StockDeserializer.class.getName());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        // Create the consumer using props.
        final Consumer<String, StockPrice> consumer =
                new KafkaConsumer<>(props);
        // Subscribe to the topic.
        consumer.subscribe(Collections.singletonList(
                StockAppConstants.TOPIC));
        return consumer;
    }
    ...
}

The SimpleStockPriceConsumer is similar to other Consumer examples we have covered so far. SimpleStockPriceConsumer subscribes to stock-prices topic and uses a custom serializer (StockDeserializer).

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/consumer/SimpleStockPriceConsumer.java

Kafka Producer: SimpleStockPriceConsumer runConsumer

package com.cloudurable.kafka.consumer;
...

public class SimpleStockPriceConsumer {
    ...

    static void runConsumer() throws InterruptedException {
        final Consumer<String, StockPrice> consumer = createConsumer();
        final Map<String, StockPrice> map = new HashMap<>();
        try {
            final int giveUp = 1000; int noRecordsCount = 0;
            int readCount = 0;
            while (true) {
                final ConsumerRecords<String, StockPrice> consumerRecords =
                        consumer.poll(1000);
                if (consumerRecords.count() == 0) {
                    noRecordsCount++;
                    if (noRecordsCount > giveUp) break;
                    else continue;
                }
                readCount++;
                consumerRecords.forEach(record -> {
                    map.put(record.key(), record.value());
                });
                if (readCount % 100 == 0) {
                    displayRecordsStatsAndStocks(map, consumerRecords);
                }
                consumer.commitAsync();
            }
        }
        finally {
            consumer.close();
        }
        System.out.println("DONE");
    }
    ...
    public static void main(String... args) throws Exception {
      runConsumer();
    }
    ...
}

The run method drains Kafka topic. It creates map of current stocks prices, and Calls displayRecordsStatsAndStocks().

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/consumer/SimpleStockPriceConsumer.java

Kafka Producer: SimpleStockPriceConsumer displayRecordsStatsAndStocks

package com.cloudurable.kafka.consumer;
...

public class SimpleStockPriceConsumer {
    ...
    private static void displayRecordsStatsAndStocks(
        final Map<String, StockPrice> stockPriceMap,
        final ConsumerRecords<String, StockPrice> consumerRecords) {
    System.out.printf("New ConsumerRecords par count %d count %d\n",
            consumerRecords.partitions().size(),
            consumerRecords.count());
    stockPriceMap.forEach((s, stockPrice) ->
            System.out.printf("ticker %s price %d.%d \n",
                stockPrice.getName(),
                stockPrice.getDollars(),
                stockPrice.getCents()));
    System.out.println();
  }

  ...
}

The displayRecordsStatsAndStocks method prints out size of each partition read and total record count. Then it prints out each stock at its current price.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/consumer/StockDeserializer.java

Kafka Producer: StockDeserializer

package com.cloudurable.kafka.consumer;

import com.cloudurable.kafka.producer.model.StockPrice;
import org.apache.kafka.common.serialization.Deserializer;

import java.nio.charset.StandardCharsets;
import java.util.Map;

public class StockDeserializer implements Deserializer<StockPrice> {

    @Override
    public StockPrice deserialize(final String topic, final byte[] data) {
        return new StockPrice(new String(data, StandardCharsets.UTF_8));
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public void close() {
    }
}

The StockDeserializer is used to deserialize StockPrice objects from the Kafka topic.

Disable batching for the Producer

Let’s start by disabling batching in the StockPriceKafkaProducer. Setting props.put(ProducerConfig.BATCH_SIZE_CONFIG, 0) turns batching off. After you do this rerun StockPriceKafkaProducer and check Consumer stats and Producer stats.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: StockPriceKafkaProducer disable batching


public class StockPriceKafkaProducer {

    private static Producer<String, StockPrice>
                                    createProducer() {
        final Properties props = new Properties();
        setupBootstrapAndSerializers(props);
        setupBatchingAndCompression(props);

        return new KafkaProducer<>(props);
    }
    ...
    private static void setupBatchingAndCompression(
            final Properties props) {
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,  0);
    }
    ...
}

Set batching to 16K and retest

Now let’s enable batching in the StockPriceKafkaProducer by setting batch size to 16K. Setting props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16_384) turns batching on and allows us to batch 16K of stock price records per partition. After you do this rerun StockPriceKafkaProducer and check Consumer stats and Producer stats.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: StockPriceKafkaProducer set batch size to 16K


public class StockPriceKafkaProducer {
    ...
    private static void setupBatchingAndCompression(
            final Properties props) {
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,  16_384);
    }
    ...
}

Results Set batching to 16K

We saw the consumer records per poll averages around 7.5 and saw the batch size increase to 136.02 - 59% more batching. Look how much the request queue time shrunk! The record-send-rate is 200% faster! You can see record-send-rate in the metrics of the producer.

Set batching to 16K and linger to 10ms

Now let’s enable linger in the StockPriceKafkaProducer by setting the linger 10 ms. Setting props.put(ProducerConfig.LINGER_MS_CONFIG, 10) turns linger on and allows us to batch for 10 ms or 16K bytes of a stock price records per partition whichever comes first. After you do this rerun StockPriceKafkaProducer and check Consumer stats and Producer stats.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: StockPriceKafkaProducer set linger to 10 ms


public class StockPriceKafkaProducer {
    ...
    private static void setupBatchingAndCompression(
            final Properties props) {

              //Linger up to 10 ms before sending batch if size not met
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,  16_384);
    }
    ...
}

Results Set batching to 16K and linger to 10 ms

We saw the consumer records per poll averages around 17 and saw the batch size increase to 796 - 585% more batching. The record-send-rate went down, but higher than without batching.

Try different sizes and times

Try 16K, 32K and 64K batch sizes and then try 10 ms, 100 ms, and 1 second linger. Which is the best for which type of use case?

Set compression to snappy, then batching to 64K and linger to 50ms

Now let’s enable compression in the StockPriceKafkaProducer by setting the compression to linger. Setting props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy) turns compression. After you do this rerun StockPriceKafkaProducer and check Consumer stats and Producer stats as before.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: StockPriceKafkaProducer enable compression


public class StockPriceKafkaProducer {
    ...
    private static void setupBatchingAndCompression(
            final Properties props) {

              //Linger up to 50 ms before sending batch if size not met
              props.put(ProducerConfig.LINGER_MS_CONFIG, 50);

              //Batch up to 64K buffer sizes.
              props.put(ProducerConfig.BATCH_SIZE_CONFIG,  16_384 * 4);

              //Use Snappy compression for batch compression.
              props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    }
    ...
}

Results for turning on compression

The Snappy compression 64K/50ms should have the highest record-send-rate and 12 the queue time.

Lab Adding Retries and Timeouts

In this lab we setup timeouts, setup retries, setup retry back off and change inflight messages to 1 so retries don’t store records out of order.

Change it and then run it

As before startup ZooKeeper if needed and three Kafka brokers. Then we will modify StockPriceKafkaProducer to configure retry, timeouts, in-flight message count and retry back off. We will run the StockPriceKafkaProducer. While we start and stop any two different Kafka Brokers while StockPriceKafkaProducer runs. Please notice retry messages in log of StockPriceKafkaProducer.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: StockPriceKafkaProducer disable batching


public class StockPriceKafkaProducer {

    private static Producer<String, StockPrice>
                                    createProducer() {
        final Properties props = new Properties();
        setupBootstrapAndSerializers(props);
        setupBatchingAndCompression(props);
        setupRetriesInFlightTimeout(props);

        return new KafkaProducer<>(props);
    }
    ...
    private static void setupRetriesInFlightTimeout(Properties props) {
        //Only two in-flight messages per Kafka broker connection
        // - max.in.flight.requests.per.connection (default 5)
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
                1);
        //Set the number of retries - retries
        props.put(ProducerConfig.RETRIES_CONFIG, 3);

        //Request timeout - request.timeout.ms
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 15_000);

        //Only retry after one second.
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1_000);
    }
    ...
}

The above configures Producer retry, timeouts, in-flight message count and retry back off.

Expected output after 2 broker shutdown

Run all brokers and then kill any two servers. Look for retry messages in the Producer log. Restart brokers and see the Producer recover. Also use replica verification to see when the broker catches up.

WARN Inflight Message Count

The MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION is the max number of unacknowledged requests that a client can send on a single connection before blocking If >1 and failed sends, then there is a risk of message re-ordering on partition during retry attempt (they could be written out of order of the send). If this is bad, depends on use but for our StockPrices this is not good, you should pick retries > 1 or inflight > 1 but not both. Avoid duplicates. The June 2017 release might fix this with sequence from producer.

Lab Write ProducerInterceptor

Let’s setup an interceptor for request sends. To do this we will create a ProducerInterceptor and implement the onSend method and the onAcknowledge method.

Producer Interception

We will configure our Producer config and set the config property: interceptor.classes to our ProducerInterceptor which we will define shortly. The ProducerInterceptor will print out debug information when we send a message and when the broker acknowledges a message. The interceptors we pass must implement ProducerInterceptor interface so we will define a StockProducerInterceptor that implements ProducerInterceptor. The StockProducerInterceptor will intercept records that the producer sends to broker and after intercept acks from the broker.

Let’s define the StockProducerInterceptor as follows.

KafkaProducer ProducerInterceptor

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockProducerInterceptor.java

Kafka Producer: StockProducerInterceptor

package com.cloudurable.kafka.producer;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class StockProducerInterceptor implements ProducerInterceptor {
   ...
}

Notice that the StockProducerInterceptor implements ProducerInterceptor.

ProducerInterceptor onSend

The onSend method gets called before the record is sent to the broker.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockProducerInterceptor.java

Kafka Producer: StockProducerInterceptor onSend

package com.cloudurable.kafka.producer;
...

public class StockProducerInterceptor implements ProducerInterceptor {

   private final Logger logger = LoggerFactory
           .getLogger(StockProducerInterceptor.class);
   private int onSendCount;
   private int onAckCount;


   @Override
   public ProducerRecord onSend(final ProducerRecord record) {
       onSendCount++;
       if (logger.isDebugEnabled()) {
           logger.debug(String.format("onSend topic=%s key=%s value=%s %d \n",
                   record.topic(), record.key(), record.value().toString(),
                   record.partition()
           ));
       } else {
           if (onSendCount % 100 == 0) {
               logger.info(String.format("onSend topic=%s key=%s value=%s %d \n",
                       record.topic(), record.key(), record.value().toString(),
                       record.partition()
               ));
           }
       }
       return record;
   }
}

The StockProducerInterceptor overrides the onSend method and increments onSendCount. Every 100 onSendCount, we print out record data.

ProducerInterceptor onAck

The onAck method gets called after the broker acknowledges the record.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockProducerInterceptor.java

Kafka Producer: StockProducerInterceptor onAck

package com.cloudurable.kafka.producer;
...

public class StockProducerInterceptor implements ProducerInterceptor {

   private final Logger logger = LoggerFactory
           .getLogger(StockProducerInterceptor.class);
   ...
   private int onAckCount;


   @Override
   public void onAcknowledgement(final RecordMetadata metadata,
                                 final Exception exception) {
       onAckCount++;

       if (logger.isDebugEnabled()) {
           logger.debug(String.format("onAck topic=%s, part=%d, offset=%d\n",
                   metadata.topic(), metadata.partition(), metadata.offset()
           ));
       } else {
           if (onAckCount % 100 == 0) {
               logger.info(String.format("onAck topic=%s, part=%d, offset=%d\n",
                       metadata.topic(), metadata.partition(), metadata.offset()
               ));
           }
       }
   }

}

The StockProducerInterceptor overrides the onAck method and increments onAckCount. Every 100 onAckCount, we print out record data.

ProducerInterceptor the rest

There are other methods to override.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockProducerInterceptor.java

Kafka Producer: StockProducerInterceptor the rest

package com.cloudurable.kafka.producer;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class StockProducerInterceptor implements ProducerInterceptor {

    private final Logger logger = LoggerFactory
            .getLogger(StockProducerInterceptor.class);
    private int onSendCount;
    private int onAckCount;


    @Override
    public ProducerRecord onSend(final ProducerRecord record) {
        onSendCount++;
        if (logger.isDebugEnabled()) {
            logger.debug(String.format("onSend topic=%s key=%s value=%s %d \n",
                    record.topic(), record.key(), record.value().toString(),
                    record.partition()
            ));
        } else {
            if (onSendCount % 100 == 0) {
                logger.info(String.format("onSend topic=%s key=%s value=%s %d \n",
                        record.topic(), record.key(), record.value().toString(),
                        record.partition()
                ));
            }
        }
        return record;
    }

    @Override
    public void onAcknowledgement(final RecordMetadata metadata,
                                  final Exception exception) {
        onAckCount++;

        if (logger.isDebugEnabled()) {
            logger.debug(String.format("onAck topic=%s, part=%d, offset=%d\n",
                    metadata.topic(), metadata.partition(), metadata.offset()
            ));
        } else {
            if (onAckCount % 100 == 0) {
                logger.info(String.format("onAck topic=%s, part=%d, offset=%d\n",
                        metadata.topic(), metadata.partition(), metadata.offset()
                ));
            }
        }
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}


We have to override close and configure.

Next we need to configure the StockProducerInterceptor in the StockPriceKafkaProducer producer config.

KafkaProducer - Interceptor Config

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: StockPriceKafkaProducer

public class StockPriceKafkaProducer {

    private static Producer<String, StockPrice>
                                    createProducer() {
        final Properties props = new Properties();
        setupBootstrapAndSerializers(props);
        setupBatchingAndCompression(props);
        setupRetriesInFlightTimeout(props);

        //Install interceptor list - config "interceptor.classes"
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                StockProducerInterceptor.class.getName());

        return new KafkaProducer<>(props);
    }

The above sets the StockProducerInterceptor.class.getName() in the config property ProducerConfig.INTERCEPTOR_CLASSES_CONFIG.

Run it. Run Servers. Run Producer. Results.

Next we startup ZooKeeper if needed, and start or restart Kafka brokers as before. Then run the StockPriceKafkaProducer and look for log message from ProducerInterceptor in output.

Results ProducerInterceptor Output

You should see oAck and onSend messages in the log from the interceptor.

Lab Write Custom Partitioner

Next let’s create a StockPricePartitioner. The StockPricePartitioner will implement a priority queue. It will treat certain stocks as important and send those stocks to the last partition. The StockPricePartitioner implements the Kafka interface Partitioner. The Partitioner interface is used to pick which partition a record lands. We will need to implement the partition() method to choose the partition. And we will need to implement the configure() method so we can read the importantStocks config property to setup importantStocks set which we use to determine if a stock is important and needs to be sent to the important partition. To do this we need to configure new Partitioner in Producer config with property ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, and pass config property importantStocks.

Producer Partitioning

To set a custom partitioner set the Producer config property partitioner.class. The default partitioner.class is org.apache.kafka.clients.producer.internals.DefaultPartitioner. All Partitioner class implements the Kafka Partitioner interface and have to override the partition() method which takes topic, key, value, and cluster and then returns partition number for record.

StockPricePartitioner configure

StockPricePartitioner implements the configure() method with importantStocks config property. The importantStocks gets parsed and added to importantStocks HashSet which is used to filter the stocks.

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

Kafka Producer: StockPriceKafkaProducer configure partitioner

package com.cloudurable.kafka.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.*;

public class StockPricePartitioner implements Partitioner{

    private final Set<String> importantStocks;
    public StockPricePartitioner() {
        importantStocks = new HashSet<>();
    }
    ...
    @Override
    public void configure(Map<String, ?> configs) {
        final String importantStocksStr = (String) configs.get("importantStocks");
        Arrays.stream(importantStocksStr.split(","))
                .forEach(importantStocks::add);
    }

}

StockPricePartitioner partition()

IMPORTANT STOCK: If stockName is in the importantStocks HashSet then put it in partitionNum = (partitionCount -1) (last partition). REGULAR STOCK: Otherwise if not in importantStocks set then not important use the the absolute value of the hash of the stockName modulus partitionCount -1 as the partition to send the record partitionNum = abs(stockName.hashCode()) % (partitionCount - 1).

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPricePartitioner.java

Kafka Producer: StockPricePartitioner partition

package com.cloudurable.kafka.producer;

public class StockPricePartitioner implements Partitioner{

    private final Set<String> importantStocks;
    public StockPricePartitioner() {
        importantStocks = new HashSet<>();
    }

    @Override
    public int partition(final String topic,
                         final Object objectKey,
                         final byte[] keyBytes,
                         final Object value,
                         final byte[] valueBytes,
                         final Cluster cluster) {

        final List<PartitionInfo> partitionInfoList =
                cluster.availablePartitionsForTopic(topic);
        final int partitionCount = partitionInfoList.size();
        final int importantPartition = partitionCount -1;
        final int normalPartitionCount = partitionCount -1;

        final String key = ((String) objectKey);

        if (importantStocks.contains(key)) {
            return importantPartition;
        } else {
            return Math.abs(key.hashCode()) % normalPartitionCount;
        }

    }
    ...
}

Producer Config: Configuring Partitioner

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPricePartitioner.java

Kafka Producer: StockPricePartitioner configure()


public class StockPriceKafkaProducer {

    private static Producer<String, StockPrice>
                                    createProducer() {
        final Properties props = new Properties();
        setupBootstrapAndSerializers(props);
        setupBatchingAndCompression(props);
        setupRetriesInFlightTimeout(props);

        //Set number of acknowledgments - acks - default is all
        props.put(ProducerConfig.ACKS_CONFIG, "all");


        //Install interceptor list - config "interceptor.classes"
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                StockProducerInterceptor.class.getName());

        props.put("importantStocks", "IBM,UBER");

        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
                StockPricePartitioner.class.getName());

        return new KafkaProducer<>(props);
    }

Configure the new Partitioner in Producer config with property ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. Pass config property to importantStocks. The importantStock are the ones that go into priority queue. Run it as before. The important stocks are IBM and UBER in this example and are the only ones that will go into the last partition.

Review of lab work

You implemented custom ProducerSerializer. You tested failover configuring broker/topic min.insync.replicas, and acks. You implemented batching and compression and used metrics to see how it was or was not working. You implemented retires and timeouts, and tested that it worked. You setup max inflight messages and retry back off. You implemented a ProducerInterceptor. You implemented a custom partitioner to implement a priority queue for important stocks.

Slides

Please enjoy these slides which is the outline for this tutorial, and the complete code listing from all of the labs for this tutorial, which is below.


Full source code for the labs

Gradle build - Build file for Advanced Kafka Producers tutorial

~/kafka-training/lab5/solution/build.gradle


group 'cloudurable-kafka'
version '1.0-SNAPSHOT'

apply plugin: 'java'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.apache.kafka:kafka-clients:0.10.2.0'
    compile 'ch.qos.logback:logback-classic:1.2.2'
    compile 'io.advantageous.boon:boon-json:0.6.6'
    testCompile 'junit:junit:4.12'
}

SimpleStockPriceConsumer - Consumer shows batches sizes seen as batch size params vary on producer

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/consumer/SimpleStockPriceConsumer.java

package com.cloudurable.kafka.consumer;
import com.cloudurable.kafka.StockAppConstants;
import com.cloudurable.kafka.producer.model.StockPrice;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class SimpleStockPriceConsumer {

    private static Consumer<String, StockPrice> createConsumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                StockAppConstants.BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                "KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        //Custom Deserializer
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StockDeserializer.class.getName());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        // Create the consumer using props.
        final Consumer<String, StockPrice> consumer =
                new KafkaConsumer<>(props);
        // Subscribe to the topic.
        consumer.subscribe(Collections.singletonList(
                StockAppConstants.TOPIC));
        return consumer;
    }


    static void runConsumer() throws InterruptedException {
        final Consumer<String, StockPrice> consumer = createConsumer();
        final Map<String, StockPrice> map = new HashMap<>();
        try {
            final int giveUp = 1000; int noRecordsCount = 0;
            int readCount = 0;
            while (true) {
                final ConsumerRecords<String, StockPrice> consumerRecords =
                        consumer.poll(1000);
                if (consumerRecords.count() == 0) {
                    noRecordsCount++;
                    if (noRecordsCount > giveUp) break;
                    else continue;
                }
                readCount++;
                consumerRecords.forEach(record -> {
                    map.put(record.key(), record.value());
                });
                if (readCount % 100 == 0) {
                    displayRecordsStatsAndStocks(map, consumerRecords);
                }
                consumer.commitAsync();
            }
        }
        finally {
            consumer.close();
        }
        System.out.println("DONE");
    }

    private static void displayRecordsStatsAndStocks(
            final Map<String, StockPrice> stockPriceMap,
            final ConsumerRecords<String, StockPrice> consumerRecords) {
        System.out.printf("New ConsumerRecords par count %d count %d\n",
                consumerRecords.partitions().size(),
                consumerRecords.count());
        stockPriceMap.forEach((s, stockPrice) ->
                System.out.printf("ticker %s price %d.%d \n",
                    stockPrice.getName(),
                    stockPrice.getDollars(),
                    stockPrice.getCents()));
        System.out.println();
    }


    public static void main(String... args) throws Exception {
        runConsumer();
    }


}

StockDeserializer - Java example for Kafka Deserializer

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/consumer/StockDeserializer.java

package com.cloudurable.kafka.consumer;

import com.cloudurable.kafka.producer.model.StockPrice;
import org.apache.kafka.common.serialization.Deserializer;

import java.nio.charset.StandardCharsets;
import java.util.Map;

public class StockDeserializer implements Deserializer<StockPrice> {

    @Override
    public StockPrice deserialize(final String topic, final byte[] data) {
        return new StockPrice(new String(data, StandardCharsets.UTF_8));
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public void close() {
    }
}

StockPrice - Java example showing custom Kafka serialization

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/model/StockPrice.java

package com.cloudurable.kafka.producer.model;

import io.advantageous.boon.json.JsonFactory;

public class StockPrice {

    private final int dollars;
    private final int cents;
    private final String name;

    public StockPrice(final String json) {
        this(JsonFactory.fromJson(json, StockPrice.class));
    }


    public StockPrice() {
        dollars = 0;
        cents = 0;
        name ="";
    }

    public StockPrice(final String name, final int dollars, final int cents) {
        this.dollars = dollars;
        this.cents = cents;
        this.name = name;
    }



    public StockPrice(final StockPrice stockPrice) {
        this.cents = stockPrice.cents;
        this.dollars = stockPrice.dollars;
        this.name = stockPrice.name;
    }


    public int getDollars() {
        return dollars;
    }


    public int getCents() {
        return cents;
    }


    public String getName() {
        return name;
    }


    @Override
    public String toString() {
        return "StockPrice{" +
                "dollars=" + dollars +
                ", cents=" + cents +
                ", name='" + name + '\'' +
                '}';
    }


    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        StockPrice that = (StockPrice) o;

        if (dollars != that.dollars) return false;
        if (cents != that.cents) return false;
        return name != null ? name.equals(that.name) : that.name == null;
    }

    @Override
    public int hashCode() {
        int result = dollars;
        result = 31 * result + cents;
        result = 31 * result + (name != null ? name.hashCode() : 0);
        return result;
    }


    public String toJson() {
        return "{" +
                "\"dollars\": " + dollars +
                ", \"cents\": " + cents +
                ", \"name\": \"" + name + '\"' +
                '}';
    }
}

MetricsProducerReporter - Java example showing using Kafka Producer Metrics

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/MetricsProducerReporter.java

package com.cloudurable.kafka.producer;

import com.cloudurable.kafka.producer.model.StockPrice;
import io.advantageous.boon.core.Sets;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

public class MetricsProducerReporter implements Runnable {
    private final Producer<String, StockPrice> producer;
    private final Logger logger =
            LoggerFactory.getLogger(MetricsProducerReporter.class);

    //Used to Filter just the metrics we want
    private final Set<String> metricsNameFilter = Sets.set(
            "record-queue-time-avg", "record-send-rate", "records-per-request-avg",
            "request-size-max", "network-io-rate", "record-queue-time-avg",
            "incoming-byte-rate", "batch-size-avg", "response-rate", "requests-in-flight"
    );

    public MetricsProducerReporter(
            final Producer<String, StockPrice> producer) {
        this.producer = producer;
    }

    @Override
    public void run() {
        while (true) {
            final Map<MetricName, ? extends Metric> metrics
                    = producer.metrics();

            displayMetrics(metrics);
            try {
                Thread.sleep(3_000);
            } catch (InterruptedException e) {
                logger.warn("metrics interrupted");
                Thread.interrupted();
                break;
            }
        }
    }


    static class MetricPair {
        private final MetricName metricName;
        private final Metric metric;
        MetricPair(MetricName metricName, Metric metric) {
            this.metricName = metricName;
            this.metric = metric;
        }
        public String toString() {
            return metricName.group() + "." + metricName.name();
        }
    }

    private void displayMetrics(Map<MetricName, ? extends Metric> metrics) {
        final Map<String, MetricPair> metricsDisplayMap = metrics.entrySet().stream()
                //Filter out metrics not in metricsNameFilter
                .filter(metricNameEntry ->
                        metricsNameFilter.contains(metricNameEntry.getKey().name()))
                //Filter out metrics not in metricsNameFilter
                .filter(metricNameEntry ->
                        !Double.isInfinite(metricNameEntry.getValue().value()) &&
                                !Double.isNaN(metricNameEntry.getValue().value()) &&
                                metricNameEntry.getValue().value() != 0
                )
                //Turn Map<MetricName,Metric> into TreeMap<String, MetricPair>
                .map(entry -> new MetricPair(entry.getKey(), entry.getValue()))
                .collect(Collectors.toMap(
                        MetricPair::toString, it -> it, (a, b) -> a, TreeMap::new
                ));


        //Output metrics
        final StringBuilder builder = new StringBuilder(255);
        builder.append("\n---------------------------------------\n");
        metricsDisplayMap.entrySet().forEach(entry -> {
            MetricPair metricPair = entry.getValue();
            String name = entry.getKey();
            builder.append(String.format(Locale.US, "%50s%25s\t\t%,-10.2f\t\t%s\n",
                    name,
                    metricPair.metricName.name(),
                    metricPair.metric.value(),
                    metricPair.metricName.description()
            ));
        });
        builder.append("\n---------------------------------------\n");
        logger.info(builder.toString());
    }


}

StockPriceKafkaProducer - Java Example that shows using Kafka producer multi-threaded

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceKafkaProducer.java

package com.cloudurable.kafka.producer;

import com.cloudurable.kafka.StockAppConstants;
import com.cloudurable.kafka.producer.model.StockPrice;
import io.advantageous.boon.core.Lists;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class StockPriceKafkaProducer {

    private static Producer<String, StockPrice>
                                    createProducer() {
        final Properties props = new Properties();
        setupBootstrapAndSerializers(props);
        setupBatchingAndCompression(props);
        setupRetriesInFlightTimeout(props);

        //Install interceptor list - config "interceptor.classes"
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                StockProducerInterceptor.class.getName());

        props.put("importantStocks", "IBM,UBER");

        return new KafkaProducer<>(props);
    }

    private static void setupRetriesInFlightTimeout(Properties props) {
        //Only two in-flight messages per Kafka broker connection
        // - max.in.flight.requests.per.connection (default 5)
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
                1);
        //Set the number of retries - retries
        props.put(ProducerConfig.RETRIES_CONFIG, 3);

        //Request timeout - request.timeout.ms
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 15_000);

        //Only retry after one second.
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1_000);
    }



    private static void setupBootstrapAndSerializers(Properties props) {
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                StockAppConstants.BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "StockPriceKafkaProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());


        //Custom Serializer - config "value.serializer"
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StockPriceSerializer.class.getName());

        //Set number of acknowledgments - acks - default is all
        props.put(ProducerConfig.ACKS_CONFIG, "all");

    }

    private static void setupBatchingAndCompression(
            final Properties props) {
        props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,  16_384 * 4);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    }


    private static final Logger logger =
            LoggerFactory.getLogger(StockPriceKafkaProducer.class);



    public static void main(String... args)
            throws Exception {
        //Create Kafka Producer
        final Producer<String, StockPrice> producer = createProducer();
        //Create StockSender list
        final List<StockSender> stockSenders = getStockSenderList(producer);

        //Create a thread pool so every stock sender gets it own.
        // Increase by 1 to fit metrics.
        final ExecutorService executorService =
                Executors.newFixedThreadPool(stockSenders.size() + 1);

        //Run Metrics Producer Reporter which is runnable passing it the producer.
        executorService.submit(new MetricsProducerReporter(producer));

        //Run each stock sender in its own thread.
        stockSenders.forEach(executorService::submit);


        //Register nice shutdown of thread pool, then flush and close producer.
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            executorService.shutdown();
            try {
                executorService.awaitTermination(200, TimeUnit.MILLISECONDS);
                logger.info("Flushing and closing producer");
                producer.flush();
                producer.close(10_000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.warn("shutting down", e);
            }
        }));
    }

    private static List<StockSender> getStockSenderList(
            final Producer<String, StockPrice> producer) {
        return Lists.list(
                new StockSender(StockAppConstants.TOPIC,
                        new StockPrice("IBM", 100, 99),
                        new StockPrice("IBM", 50, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("SUN", 100, 99),
                        new StockPrice("SUN", 50, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("GOOG", 500, 99),
                        new StockPrice("GOOG", 400, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("INEL", 100, 99),
                        new StockPrice("INEL", 50, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("UBER", 1000, 99),
                        new StockPrice("UBER", 50, 0),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("ABC", 100, 99),
                        new StockPrice("ABC", 50, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("XYZ", 100, 99),
                        new StockPrice("XYZ", 50, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("DEF", 100, 99),
                        new StockPrice("DEF", 50, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("DEF", 100, 99),
                        new StockPrice("DEF", 50, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("AAA", 100, 99),
                        new StockPrice("AAA", 50, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("BBB", 100, 99),
                        new StockPrice("BBB", 50, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("CCC", 100, 99),
                        new StockPrice("CCC", 50, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("DDD", 100, 99),
                        new StockPrice("DDD", 50, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("EEE", 100, 99),
                        new StockPrice("EEE", 50, 10),
                        producer,
                        1, 10
                ),
                new StockSender(
                        StockAppConstants.TOPIC,
                        new StockPrice("FFF", 100, 99),
                        new StockPrice("FFF", 50, 10),
                        producer,
                        1, 10
                )
        );

    }

}

StockPricePartitioner - Java example showing implementation of custom Kafka Partitioner

/Users/jean-paul/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPricePartitioner.java

package com.cloudurable.kafka.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.*;

public class StockPricePartitioner implements Partitioner{

    private final Set<String> importantStocks;
    public StockPricePartitioner() {
        importantStocks = new HashSet<>();
    }

    @Override
    public int partition(final String topic,
                         final Object objectKey,
                         final byte[] keyBytes,
                         final Object value,
                         final byte[] valueBytes,
                         final Cluster cluster) {

        final List<PartitionInfo> partitionInfoList =
                cluster.availablePartitionsForTopic(topic);
        final int partitionCount = partitionInfoList.size();
        final int importantPartition = partitionCount -1;
        final int normalPartitionCount = partitionCount -1;

        final String key = ((String) objectKey);

        if (importantStocks.contains(key)) {
            return importantPartition;
        } else {
            return Math.abs(key.hashCode()) % normalPartitionCount;
        }

    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
        final String importantStocksStr = (String) configs.get("importantStocks");
        Arrays.stream(importantStocksStr.split(","))
                .forEach(importantStocks::add);
    }

}

StockPriceSerializer - Java example showing implementation of custom Kafka Producer Serializer

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockPriceSerializer.java

package com.cloudurable.kafka.producer;
import com.cloudurable.kafka.producer.model.StockPrice;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class StockPriceSerializer implements Serializer<StockPrice> {

    @Override
    public byte[] serialize(String topic, StockPrice data) {
        return data.toJson().getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public void close() {
    }
}


StockProducerInterceptor - Java example showing implementation of custom Kafka ProducerInterceptor

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockProducerInterceptor.java

package com.cloudurable.kafka.producer;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class StockProducerInterceptor implements ProducerInterceptor {

    private final Logger logger = LoggerFactory
            .getLogger(StockProducerInterceptor.class);
    private int onSendCount;
    private int onAckCount;

    @Override
    public ProducerRecord onSend(final ProducerRecord record) {
        onSendCount++;
        if (logger.isDebugEnabled()) {
            logger.debug(String.format("onSend topic=%s key=%s value=%s %d \n",
                    record.topic(), record.key(), record.value().toString(),
                    record.partition()
            ));
        } else {
            if (onSendCount % 100 == 0) {
                logger.info(String.format("onSend topic=%s key=%s value=%s %d \n",
                        record.topic(), record.key(), record.value().toString(),
                        record.partition()
                ));
            }
        }
        return record;
    }

    @Override
    public void onAcknowledgement(final RecordMetadata metadata,
                                  final Exception exception) {
        onAckCount++;

        if (logger.isDebugEnabled()) {
            logger.debug(String.format("onAck topic=%s, part=%d, offset=%d\n",
                    metadata.topic(), metadata.partition(), metadata.offset()
            ));
        } else {
            if (onAckCount % 100 == 0) {
                logger.info(String.format("onAck topic=%s, part=%d, offset=%d\n",
                        metadata.topic(), metadata.partition(), metadata.offset()
                ));
            }
        }
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}


StockSender- Java Example that shows using Kafka producer from many threads (multi-threaded producer)

/Users/jean-paul/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/producer/StockSender.java

package com.cloudurable.kafka.producer;

import com.cloudurable.kafka.producer.model.StockPrice;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class StockSender implements Runnable{

    private final StockPrice stockPriceHigh;
    private final StockPrice stockPriceLow;
    private final Producer<String, StockPrice> producer;
    private final int delayMinMs;
    private final int delayMaxMs;
    private final Logger logger = LoggerFactory.getLogger(StockSender.class);
    private final String topic;

    public StockSender(final String topic, final StockPrice stockPriceHigh,
                       final StockPrice stockPriceLow,
                       final Producer<String, StockPrice> producer,
                       final int delayMinMs,
                       final int delayMaxMs) {
        this.stockPriceHigh = stockPriceHigh;
        this.stockPriceLow = stockPriceLow;
        this.producer = producer;
        this.delayMinMs = delayMinMs;
        this.delayMaxMs = delayMaxMs;
        this.topic = topic;
    }


    public void run() {
        final Random random = new Random(System.currentTimeMillis());
        int sentCount = 0;

        while (true) {
            sentCount++;
            final ProducerRecord <String, StockPrice> record =
                                        createRandomRecord(random);
            final int delay = randomIntBetween(random, delayMaxMs, delayMinMs);

            try {
                final Future<RecordMetadata> future = producer.send(record);
                if (sentCount % 100 == 0) {displayRecordMetaData(record, future);}
                Thread.sleep(delay);
            } catch (InterruptedException e) {
                if (Thread.interrupted()) {
                    break;
                }
            } catch (ExecutionException e) {
                logger.error("problem sending record to producer", e);
            }
        }
    }

    private void displayRecordMetaData(final ProducerRecord<String, StockPrice> record,
                                       final Future<RecordMetadata> future)
                                throws InterruptedException, ExecutionException {
        final RecordMetadata recordMetadata = future.get();
        logger.info(String.format("\n\t\t\tkey=%s, value=%s " +
                        "\n\t\t\tsent to topic=%s part=%d off=%d at time=%s",
                record.key(),
                record.value().toJson(),
                recordMetadata.topic(),
                recordMetadata.partition(),
                recordMetadata.offset(),
                new Date(recordMetadata.timestamp())
                ));
    }

    private final int randomIntBetween(final Random random,
                                       final int max,
                                       final int min) {
        return random.nextInt(max - min + 1) + min;
    }

    private ProducerRecord<String, StockPrice> createRandomRecord(
                final Random random) {

        final int dollarAmount = randomIntBetween(random,
                stockPriceHigh.getDollars(), stockPriceLow.getDollars());

        final int centAmount = randomIntBetween(random,
                stockPriceHigh.getCents(), stockPriceLow.getCents());

        final StockPrice stockPrice = new StockPrice(
                stockPriceHigh.getName(), dollarAmount, centAmount);

        return new ProducerRecord<>(topic, stockPrice.getName(),
                stockPrice);
    }
}


StockAppConstants - Constants

~/kafka-training/lab5/solution/src/main/java/com/cloudurable/kafka/StockAppConstants.java

package com.cloudurable.kafka;

public class StockAppConstants {
    public final static String TOPIC = "stock-prices";
    public final static String BOOTSTRAP_SERVERS =
            "localhost:9092,localhost:9093,localhost:9094";

}

logback.xml - example of setting up logging for Kafka

~/kafka-training/lab5/solution/src/main/resources/logback.xml

<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <!-- encoders are assigned the type
             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %n%msg%n</pattern>
        </encoder>
    </appender>


    <logger name="com.cloudurable.kafka" level="INFO" />

    <logger name="org.apache.kafka" level="INFO"/>
    <logger name="org.apache.kafka.common.metrics" level="INFO"/>

    <root level="debug">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

create-topic.sh - example of creating a topic with kafka-topics.sh passing min.insync.replicas=2

~/kafka-training/lab5/solution/bin/create-topic.sh

#!/usr/bin/env bash

cd ~/kafka-training

kafka/bin/kafka-topics.sh \
    --create \
    --zookeeper localhost:2181 \
    --replication-factor 3 \
    --partitions 3 \
    --topic stock-prices \
    --config min.insync.replicas=2

    #--config unclean.leader.election.enable=true \
    #--config min.insync.replicas=2 \
    #--config compression.type=producer \
    #--config cleanup.policy=compact \
    #--config retention.ms=60000

delete-topic.sh - example of deleting a Kafka topic

~/kafka-training/lab5/solution/bin/delete-topic.sh

#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-topics.sh \
    --delete \
    --zookeeper localhost:2181 \
    --topic stock-prices

describe-topics.sh - example of using kafka-topics.sh to describe topics partitions/leader

~/kafka-training/lab5/solution/bin/describe-topics.sh

#!/usr/bin/env bash

cd ~/kafka-training

# List existing topics
kafka/bin/kafka-topics.sh --describe \
    --topic stock-prices \
    --zookeeper localhost:2181

list-topics.sh - example of using kafka-topics.sh to show topics

~/kafka-training/lab5/solution/bin/list-topics.sh

#!/usr/bin/env bash

cd ~/kafka-training

# List existing topics
kafka/bin/kafka-topics.sh --list \
    --zookeeper localhost:2181

replica-verification.sh - example of kafka-replica-verification.sh to show how far replicas are behind

~/kafka-training/lab5/solution/bin/replica-verification.sh

#!/usr/bin/env bash

cd ~/kafka-training

# List existing topics
kafka/bin/kafka-replica-verification.sh  \
    --report-interval-ms 5000 \
    --topic-white-list  "stock-prices.*" \
    --broker-list localhost:9092,localhost:9093,localhost:9094


start-1st-server.sh - example using kafka-server-start.sh passing custom properties

~/kafka-training/lab5/solution/bin/start-1st-server.sh

#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
    "$CONFIG/server-0.properties"


start-2nd-server.sh

~/kafka-training/lab5/solution/bin/start-2nd-server.sh

#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
    "$CONFIG/server-1.properties"

start-3rd-server.sh

~/kafka-training/lab5/solution/bin/start-3rd-server.sh

#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
## Run Kafka
kafka/bin/kafka-server-start.sh \
    "$CONFIG/server-2.properties"

server-0.properties - example properties file for Kafka with min.insync.replicas=3 set

~/kafka-training/lab5/solution/config/server-0.properties

broker.id=0
port=9092
log.dirs=./logs/kafka-0
## Require three replicas to respond
## before acknowledging send from producer.
min.insync.replicas=3



compression.type=producer
auto.create.topics.enable=false
message.max.bytes=65536
replica.lag.time.max.ms=5000
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000


server-1.properties

~/kafka-training/lab5/solution/config/server-1.properties

broker.id=1
port=9092
log.dirs=./logs/kafka-1
## Require three replicas to respond
## before acknowledging send from producer.
min.insync.replicas=3



compression.type=producer
auto.create.topics.enable=false
message.max.bytes=65536
replica.lag.time.max.ms=5000
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

server-2.properties

~/kafka-training/lab5/solution/config/server-2.properties

broker.id=2
port=9092
log.dirs=./logs/kafka-2
## Require three replicas to respond
## before acknowledging send from producer.
min.insync.replicas=3



compression.type=producer
auto.create.topics.enable=false
message.max.bytes=65536
replica.lag.time.max.ms=5000
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000


More to come.


Kafka Tutorial

This comprehensive Kafka tutorial covers Kafka architecture and design. The Kafka tutorial has example Java Kafka producers and Kafka consumers. The Kafka tutorial also covers Avro and Schema Registry.

Complete Kafka Tutorial: Architecture, Design, DevOps and Java Examples.


About Cloudurable

We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

                                                                           

Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF