Kafka Tutorial: Creating a Kafka Producer in Java - 2025 Edition

January 9, 2025

                                                                           

🚀 What’s New in This 2025 Update

Major Updates and Changes

  • Idempotence by Default - Exactly-once semantics standard
  • KRaft Mode - No ZooKeeper configuration needed
  • Modern Java - Java 17+ with records and lambdas
  • Cloud-Native Patterns - Producer pools and async best practices
  • Enhanced Security - mTLS, OAuth, and fine-grained ACLs
  • Performance Tuning - Optimized batching and compression

Producer Evolution Since 2017

  • ✅ Simplified Configuration - Smart defaults for production
  • ✅ Better Error Handling - Automatic retries with idempotence
  • ✅ Improved Monitoring - Built-in metrics and interceptors
  • ✅ Transaction Support - Multi-partition atomic writes

Ready to build a high-performance Kafka producer? Let’s create a production-ready Java producer with all the modern best practices!

Kafka Tutorial: Writing a Kafka Producer in Java

flowchart LR
  subgraph Producer["Kafka Producer"]
    APP[Java Application]
    CONFIG[Configuration]
    BATCH[Record Batching]
    COMPRESS[Compression]
    RETRY[Retry Logic]
  end
  
  subgraph Kafka["Kafka Cluster"]
    P1[Partition 1]
    P2[Partition 2]
    P3[Partition 3]
  end
  
  APP --> CONFIG
  CONFIG --> BATCH
  BATCH --> COMPRESS
  COMPRESS --> RETRY
  RETRY --> P1
  RETRY --> P2
  RETRY --> P3
  
  classDef producer fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#333333
  classDef kafka fill:#e1bee7,stroke:#8e24aa,stroke-width:1px,color:#333333
  
  class APP,CONFIG,BATCH,COMPRESS,RETRY producer
  class P1,P2,P3 kafka

In this tutorial, we’ll create a production-ready Kafka producer in Java that:

  • Uses idempotent configuration for exactly-once delivery
  • Implements async sending with proper error handling
  • Includes monitoring and metrics collection
  • Follows cloud-native best practices
  • Demonstrates transactional capabilities

Prerequisites

Before you start:

Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Create Kafka Topic

First, let’s create a topic using KRaft mode (no ZooKeeper!):

~/kafka-training/lab3/create-topic.sh

#!/usr/bin/env bash
cd ~/kafka-training

# Create topic with KRaft
kafka/bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 3 \
    --partitions 13 \
    --topic events \
    --config min.insync.replicas=2 \
    --config compression.type=lz4

# Create compacted topic for user profiles
kafka/bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 3 \
    --partitions 6 \
    --topic user-profiles \
    --config cleanup.policy=compact \
    --config min.insync.replicas=2

# List topics
kafka/bin/kafka-topics.sh --list \
    --bootstrap-server localhost:9092

Run create-topic.sh

$ ./create-topic.sh
Created topic events.
Created topic user-profiles.
events
user-profiles
__consumer_offsets

Project Setup

Gradle Build Script (build.gradle)

plugins {
    id 'java'
    id 'application'
}

group = 'com.cloudurable.kafka'
version = '1.0-SNAPSHOT'

java {
    sourceCompatibility = JavaVersion.VERSION_17
    targetCompatibility = JavaVersion.VERSION_17
}

repositories {
    mavenCentral()
}

dependencies {
    // Kafka client with all features
    implementation 'org.apache.kafka:kafka-clients:3.6.0'
    
    // Logging
    implementation 'ch.qos.logback:logback-classic:1.4.14'
    
    // Metrics
    implementation 'io.micrometer:micrometer-core:1.12.0'
    implementation 'io.micrometer:micrometer-registry-prometheus:1.12.0'
    
    // JSON serialization
    implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.0'
    
    // Testing
    testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0'
    testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.6.0'
}

application {
    mainClass = 'com.cloudurable.kafka.ModernKafkaProducer'
}

test {
    useJUnitPlatform()
}

Modern Kafka Producer Implementation

Event Record (Using Java Records)

package com.cloudurable.kafka.model;

import java.time.Instant;
import java.util.UUID;

public record Event(
    String id,
    String userId,
    String type,
    String data,
    Instant timestamp
) {
    public static Event create(String userId, String type, String data) {
        return new Event(
            UUID.randomUUID().toString(),
            userId,
            type,
            data,
            Instant.now()
        );
    }
}

JSON Serializer

package com.cloudurable.kafka.serialization;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.kafka.common.serialization.Serializer;

public class JsonSerializer<T> implements Serializer<T> {
    private final ObjectMapper objectMapper;
    
    public JsonSerializer() {
        this.objectMapper = new ObjectMapper();
        this.objectMapper.registerModule(new JavaTimeModule());
    }
    
    @Override
    public byte[] serialize(String topic, T data) {
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new RuntimeException("Error serializing object", e);
        }
    }
}

Production-Ready Kafka Producer

package com.cloudurable.kafka;

import com.cloudurable.kafka.model.Event;
import com.cloudurable.kafka.serialization.JsonSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

public class ModernKafkaProducer {
    private static final Logger logger = LoggerFactory.getLogger(ModernKafkaProducer.class);
    
    private static final String TOPIC = "events";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    
    // Metrics
    private static final AtomicLong successCount = new AtomicLong(0);
    private static final AtomicLong errorCount = new AtomicLong(0);
    
    public static Producer<String, Event> createProducer() {
        Properties props = new Properties();
        
        // Connection settings
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "event-producer-" + System.currentTimeMillis());
        
        // Serialization
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
        
        // Idempotence for exactly-once semantics
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // Required for idempotence
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        
        // Performance optimizations
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32 * 1024); // 32KB
        props.put(ProducerConfig.LINGER_MS_CONFIG, 20); // Wait up to 20ms for batching
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024); // 64MB
        
        // Timeout configurations
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
        
        return new KafkaProducer<>(props);
    }
}

Async Producer with Error Handling

package com.cloudurable.kafka;

import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class AsyncProducerExample {
    private static final Logger logger = LoggerFactory.getLogger(AsyncProducerExample.class);
    
    public static CompletableFuture<RecordMetadata> sendEventAsync(
            Producer<String, Event> producer, Event event) {
        
        CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
        
        ProducerRecord<String, Event> record = new ProducerRecord<>(
            "events",           // topic
            null,              // partition (let Kafka decide)
            event.timestamp().toEpochMilli(), // timestamp
            event.userId(),    // key for partitioning
            event             // value
        );
        
        // Add headers for metadata
        record.headers()
            .add("event-type", event.type().getBytes())
            .add("source", "event-service".getBytes());
        
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    logger.error("Failed to send event: {}", event, exception);
                    errorCount.incrementAndGet();
                    future.completeExceptionally(exception);
                } else {
                    logger.info("Event sent successfully: partition={}, offset={}, timestamp={}",
                        metadata.partition(), metadata.offset(), metadata.timestamp());
                    successCount.incrementAndGet();
                    future.complete(metadata);
                }
            }
        });
        
        return future;
    }
    
    public static void main(String[] args) throws Exception {
        try (Producer<String, Event> producer = createProducer()) {
            
            // Send multiple events asynchronously
            for (int i = 0; i < 100; i++) {
                Event event = Event.create(
                    "user-" + (i % 10),
                    "page-view",
                    "{\"page\": \"/products\", \"duration\": " + (100 + i) + "}"
                );
                
                sendEventAsync(producer, event)
                    .exceptionally(throwable -> {
                        logger.error("Failed to process event", throwable);
                        return null;
                    });
                
                // Simulate some delay between events
                Thread.sleep(10);
            }
            
            // Ensure all messages are sent before closing
            producer.flush();
            
            logger.info("Sent {} events successfully, {} failed", 
                successCount.get(), errorCount.get());
        }
    }
}

Transactional Producer

package com.cloudurable.kafka;

import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Properties;

public class TransactionalProducerExample {
    private static final Logger logger = LoggerFactory.getLogger(TransactionalProducerExample.class);
    
    public static Producer<String, Event> createTransactionalProducer() {
        Properties props = new Properties();
        
        // Standard configuration
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
        
        // Transactional configuration
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "event-producer-tx-" + System.currentTimeMillis());
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // Required for transactions
        
        Producer<String, Event> producer = new KafkaProducer<>(props);
        
        // Initialize transactions
        producer.initTransactions();
        
        return producer;
    }
    
    public static void sendTransactionalBatch(Producer<String, Event> producer, 
                                            List<Event> events) {
        try {
            producer.beginTransaction();
            
            for (Event event : events) {
                ProducerRecord<String, Event> record = new ProducerRecord<>(
                    "events", event.userId(), event
                );
                
                // Send within transaction
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        logger.error("Error in transaction", exception);
                    }
                });
            }
            
            // Commit all or nothing
            producer.commitTransaction();
            logger.info("Transaction committed with {} events", events.size());
            
        } catch (ProducerFencedException | OutOfOrderSequenceException | 
                 AuthorizationException e) {
            // Fatal errors - cannot recover
            logger.error("Fatal error in transaction", e);
            producer.close();
            throw e;
        } catch (KafkaException e) {
            // Abort transaction for other errors
            logger.error("Error in transaction, aborting", e);
            producer.abortTransaction();
            throw e;
        }
    }
}

Producer with Custom Interceptor

package com.cloudurable.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class MetricsInterceptor<K, V> implements ProducerInterceptor<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(MetricsInterceptor.class);
    private final AtomicLong sentCount = new AtomicLong(0);
    private final AtomicLong ackCount = new AtomicLong(0);
    
    @Override
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        sentCount.incrementAndGet();
        
        // Add custom headers
        record.headers().add("sent-time", String.valueOf(System.currentTimeMillis()).getBytes());
        
        return record;
    }
    
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            ackCount.incrementAndGet();
        }
        
        // Log metrics every 1000 messages
        if (sentCount.get() % 1000 == 0) {
            logger.info("Producer metrics - Sent: {}, Acknowledged: {}", 
                sentCount.get(), ackCount.get());
        }
    }
    
    @Override
    public void close() {
        logger.info("Final metrics - Sent: {}, Acknowledged: {}", 
            sentCount.get(), ackCount.get());
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
        // Configuration if needed
    }
}

Producer Pool Pattern

package com.cloudurable.kafka;

import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ProducerPool implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(ProducerPool.class);
    
    private final BlockingQueue<Producer<String, Event>> pool;
    private final int poolSize;
    
    public ProducerPool(int poolSize) {
        this.poolSize = poolSize;
        this.pool = new LinkedBlockingQueue<>(poolSize);
        
        // Initialize pool
        for (int i = 0; i < poolSize; i++) {
            pool.offer(ModernKafkaProducer.createProducer());
        }
    }
    
    public void send(Event event) throws InterruptedException {
        Producer<String, Event> producer = null;
        try {
            // Get producer from pool
            producer = pool.poll(5, TimeUnit.SECONDS);
            if (producer == null) {
                throw new RuntimeException("No available producer in pool");
            }
            
            ProducerRecord<String, Event> record = new ProducerRecord<>(
                "events", event.userId(), event
            );
            
            producer.send(record);
            
        } finally {
            // Return producer to pool
            if (producer != null) {
                pool.offer(producer);
            }
        }
    }
    
    @Override
    public void close() {
        pool.forEach(producer -> {
            try {
                producer.close(Duration.ofSeconds(10));
            } catch (Exception e) {
                logger.error("Error closing producer", e);
            }
        });
    }
}

Production Configuration Best Practices

Application Properties

# Kafka Producer Configuration
kafka.bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
kafka.client.id=event-producer

# Security
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=OAUTHBEARER
kafka.ssl.truststore.location=/var/ssl/kafka.client.truststore.jks
kafka.ssl.truststore.password=${TRUSTSTORE_PASSWORD}

# Performance
kafka.batch.size=65536
kafka.linger.ms=10
kafka.compression.type=lz4
kafka.buffer.memory=134217728

# Reliability
kafka.acks=all
kafka.enable.idempotence=true
kafka.retries=2147483647
kafka.max.in.flight.requests.per.connection=5

# Timeouts
kafka.request.timeout.ms=30000
kafka.delivery.timeout.ms=120000

Monitoring and Metrics

package com.cloudurable.kafka.monitoring;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;

public class MonitoredCallback implements Callback {
    private final MeterRegistry meterRegistry;
    private final Timer.Sample sample;
    
    public MonitoredCallback(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.sample = Timer.start(meterRegistry);
    }
    
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            meterRegistry.counter("kafka.producer.errors", 
                "error", exception.getClass().getSimpleName()).increment();
        } else {
            sample.stop(meterRegistry.timer("kafka.producer.latency",
                "topic", metadata.topic(),
                "partition", String.valueOf(metadata.partition())));
            
            meterRegistry.counter("kafka.producer.success",
                "topic", metadata.topic()).increment();
        }
    }
}

Error Handling Patterns

public class ResilientProducer {
    private static final Logger logger = LoggerFactory.getLogger(ResilientProducer.class);
    
    public static void sendWithRetry(Producer<String, Event> producer, 
                                   Event event, 
                                   int maxRetries) {
        int attempts = 0;
        Exception lastException = null;
        
        while (attempts < maxRetries) {
            try {
                ProducerRecord<String, Event> record = new ProducerRecord<>(
                    "events", event.userId(), event
                );
                
                Future<RecordMetadata> future = producer.send(record);
                RecordMetadata metadata = future.get(10, TimeUnit.SECONDS);
                
                logger.info("Event sent successfully after {} attempts", attempts + 1);
                return;
                
            } catch (Exception e) {
                lastException = e;
                attempts++;
                
                if (attempts < maxRetries) {
                    logger.warn("Failed to send event, attempt {}/{}", 
                        attempts, maxRetries, e);
                    
                    // Exponential backoff
                    try {
                        Thread.sleep((long) Math.pow(2, attempts) * 1000);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
        
        logger.error("Failed to send event after {} attempts", maxRetries, lastException);
        throw new RuntimeException("Failed to send event", lastException);
    }
}

Testing Your Producer

package com.cloudurable.kafka;

import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

class ProducerTest {
    
    @Test
    void testEventProduction() {
        // Create mock producer
        MockProducer<String, Event> mockProducer = new MockProducer<>(
            true, new StringSerializer(), new JsonSerializer<>()
        );
        
        // Send event
        Event event = Event.create("user123", "login", "{\"ip\": \"192.168.1.1\"}");
        ProducerRecord<String, Event> record = new ProducerRecord<>(
            "events", event.userId(), event
        );
        
        mockProducer.send(record);
        
        // Verify
        assertEquals(1, mockProducer.history().size());
        ProducerRecord<String, Event> sent = mockProducer.history().get(0);
        assertEquals("user123", sent.key());
        assertEquals("login", sent.value().type());
    }
}

Performance Tuning Guide

flowchart TB
  subgraph Tuning["Performance Tuning"]
    BATCH[Increase Batch Size<br>batch.size=64KB]
    LINGER[Add Linger Time<br>linger.ms=10-20ms]
    COMPRESS[Enable Compression<br>compression.type=lz4]
    BUFFER[Increase Buffer<br>buffer.memory=128MB]
    ASYNC[Use Async Send<br>Non-blocking]
  end
  
  subgraph Metrics["Monitor These Metrics"]
    M1[Records/Second]
    M2[Bytes/Second]
    M3[Batch Size Avg]
    M4[Compression Ratio]
    M5[Request Latency]
  end
  
  Tuning --> Metrics
  
  style Tuning fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
  style Metrics fill:#e8f5e9,stroke:#43a047,stroke-width:1px

Common Issues and Solutions

Issue Solution
TimeoutException Increase request.timeout.ms and delivery.timeout.ms
BufferExhaustedException Increase buffer.memory or reduce send rate
SerializationException Verify serializer configuration and data format
AuthenticationException Check security configs and credentials
OutOfOrderSequenceException Enable idempotence or handle in application

Review Questions

Why enable idempotence?

Idempotence prevents duplicate messages during retries, ensuring exactly-once delivery semantics.

When to use transactions?

Use transactions when you need atomic writes across multiple partitions or topics.

What’s the optimal batch size?

Start with 16-64KB and tune based on your message size and throughput requirements.

How to handle producer failures?

Use callbacks for async handling, implement retry logic, and monitor error metrics.

Why use producer pools?

Producer instances are thread-safe but creating them is expensive. Pools improve performance.

Summary

Modern Kafka producers in 2025 emphasize:

  • Reliability through idempotence and proper error handling
  • Performance via batching, compression, and async patterns
  • Observability with metrics and monitoring
  • Security using mTLS and OAuth
  • Cloud-native patterns for scalability

With these patterns, you can build producers that handle millions of events reliably!

About Cloudurable

We hope you enjoyed this tutorial. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.

                                                                           
comments powered by Disqus

Apache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting