January 9, 2025
🚀 What’s New in This 2025 Update
Major Updates and Changes
- Idempotence by Default - Exactly-once semantics standard
- KRaft Mode - No ZooKeeper configuration needed
- Modern Java - Java 17+ with records and lambdas
- Cloud-Native Patterns - Producer pools and async best practices
- Enhanced Security - mTLS, OAuth, and fine-grained ACLs
- Performance Tuning - Optimized batching and compression
Producer Evolution Since 2017
- ✅ Simplified Configuration - Smart defaults for production
- ✅ Better Error Handling - Automatic retries with idempotence
- ✅ Improved Monitoring - Built-in metrics and interceptors
- ✅ Transaction Support - Multi-partition atomic writes
Ready to build a high-performance Kafka producer? Let’s create a production-ready Java producer with all the modern best practices!
Kafka Tutorial: Writing a Kafka Producer in Java
flowchart LR
subgraph Producer["Kafka Producer"]
APP[Java Application]
CONFIG[Configuration]
BATCH[Record Batching]
COMPRESS[Compression]
RETRY[Retry Logic]
end
subgraph Kafka["Kafka Cluster"]
P1[Partition 1]
P2[Partition 2]
P3[Partition 3]
end
APP --> CONFIG
CONFIG --> BATCH
BATCH --> COMPRESS
COMPRESS --> RETRY
RETRY --> P1
RETRY --> P2
RETRY --> P3
classDef producer fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#333333
classDef kafka fill:#e1bee7,stroke:#8e24aa,stroke-width:1px,color:#333333
class APP,CONFIG,BATCH,COMPRESS,RETRY producer
class P1,P2,P3 kafka
In this tutorial, we’ll create a production-ready Kafka producer in Java that:
- Uses idempotent configuration for exactly-once delivery
- Implements async sending with proper error handling
- Includes monitoring and metrics collection
- Follows cloud-native best practices
- Demonstrates transactional capabilities
Prerequisites
Before you start:
- Complete Kafka from the command line
- Java 17+ installed (required for Kafka 4.0)
- Basic understanding of Kafka Architecture
- Familiarity with Kafka Producer Architecture
Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
Create Kafka Topic
First, let’s create a topic using KRaft mode (no ZooKeeper!):
~/kafka-training/lab3/create-topic.sh
#!/usr/bin/env bash
cd ~/kafka-training
# Create topic with KRaft
kafka/bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 3 \
--partitions 13 \
--topic events \
--config min.insync.replicas=2 \
--config compression.type=lz4
# Create compacted topic for user profiles
kafka/bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 3 \
--partitions 6 \
--topic user-profiles \
--config cleanup.policy=compact \
--config min.insync.replicas=2
# List topics
kafka/bin/kafka-topics.sh --list \
--bootstrap-server localhost:9092
Run create-topic.sh
$ ./create-topic.sh
Created topic events.
Created topic user-profiles.
events
user-profiles
__consumer_offsets
Project Setup
Gradle Build Script (build.gradle)
plugins {
id 'java'
id 'application'
}
group = 'com.cloudurable.kafka'
version = '1.0-SNAPSHOT'
java {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}
repositories {
mavenCentral()
}
dependencies {
// Kafka client with all features
implementation 'org.apache.kafka:kafka-clients:3.6.0'
// Logging
implementation 'ch.qos.logback:logback-classic:1.4.14'
// Metrics
implementation 'io.micrometer:micrometer-core:1.12.0'
implementation 'io.micrometer:micrometer-registry-prometheus:1.12.0'
// JSON serialization
implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.0'
// Testing
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0'
testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.6.0'
}
application {
mainClass = 'com.cloudurable.kafka.ModernKafkaProducer'
}
test {
useJUnitPlatform()
}
Modern Kafka Producer Implementation
Event Record (Using Java Records)
package com.cloudurable.kafka.model;
import java.time.Instant;
import java.util.UUID;
public record Event(
String id,
String userId,
String type,
String data,
Instant timestamp
) {
public static Event create(String userId, String type, String data) {
return new Event(
UUID.randomUUID().toString(),
userId,
type,
data,
Instant.now()
);
}
}
JSON Serializer
package com.cloudurable.kafka.serialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.kafka.common.serialization.Serializer;
public class JsonSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper;
public JsonSerializer() {
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
}
@Override
public byte[] serialize(String topic, T data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new RuntimeException("Error serializing object", e);
}
}
}
Production-Ready Kafka Producer
package com.cloudurable.kafka;
import com.cloudurable.kafka.model.Event;
import com.cloudurable.kafka.serialization.JsonSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
public class ModernKafkaProducer {
private static final Logger logger = LoggerFactory.getLogger(ModernKafkaProducer.class);
private static final String TOPIC = "events";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
// Metrics
private static final AtomicLong successCount = new AtomicLong(0);
private static final AtomicLong errorCount = new AtomicLong(0);
public static Producer<String, Event> createProducer() {
Properties props = new Properties();
// Connection settings
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "event-producer-" + System.currentTimeMillis());
// Serialization
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
// Idempotence for exactly-once semantics
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Required for idempotence
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Performance optimizations
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32 * 1024); // 32KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 20); // Wait up to 20ms for batching
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024); // 64MB
// Timeout configurations
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
return new KafkaProducer<>(props);
}
}
Async Producer with Error Handling
package com.cloudurable.kafka;
import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class AsyncProducerExample {
private static final Logger logger = LoggerFactory.getLogger(AsyncProducerExample.class);
public static CompletableFuture<RecordMetadata> sendEventAsync(
Producer<String, Event> producer, Event event) {
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
ProducerRecord<String, Event> record = new ProducerRecord<>(
"events", // topic
null, // partition (let Kafka decide)
event.timestamp().toEpochMilli(), // timestamp
event.userId(), // key for partitioning
event // value
);
// Add headers for metadata
record.headers()
.add("event-type", event.type().getBytes())
.add("source", "event-service".getBytes());
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error("Failed to send event: {}", event, exception);
errorCount.incrementAndGet();
future.completeExceptionally(exception);
} else {
logger.info("Event sent successfully: partition={}, offset={}, timestamp={}",
metadata.partition(), metadata.offset(), metadata.timestamp());
successCount.incrementAndGet();
future.complete(metadata);
}
}
});
return future;
}
public static void main(String[] args) throws Exception {
try (Producer<String, Event> producer = createProducer()) {
// Send multiple events asynchronously
for (int i = 0; i < 100; i++) {
Event event = Event.create(
"user-" + (i % 10),
"page-view",
"{\"page\": \"/products\", \"duration\": " + (100 + i) + "}"
);
sendEventAsync(producer, event)
.exceptionally(throwable -> {
logger.error("Failed to process event", throwable);
return null;
});
// Simulate some delay between events
Thread.sleep(10);
}
// Ensure all messages are sent before closing
producer.flush();
logger.info("Sent {} events successfully, {} failed",
successCount.get(), errorCount.get());
}
}
}
Transactional Producer
package com.cloudurable.kafka;
import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Properties;
public class TransactionalProducerExample {
private static final Logger logger = LoggerFactory.getLogger(TransactionalProducerExample.class);
public static Producer<String, Event> createTransactionalProducer() {
Properties props = new Properties();
// Standard configuration
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
// Transactional configuration
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "event-producer-tx-" + System.currentTimeMillis());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // Required for transactions
Producer<String, Event> producer = new KafkaProducer<>(props);
// Initialize transactions
producer.initTransactions();
return producer;
}
public static void sendTransactionalBatch(Producer<String, Event> producer,
List<Event> events) {
try {
producer.beginTransaction();
for (Event event : events) {
ProducerRecord<String, Event> record = new ProducerRecord<>(
"events", event.userId(), event
);
// Send within transaction
producer.send(record, (metadata, exception) -> {
if (exception != null) {
logger.error("Error in transaction", exception);
}
});
}
// Commit all or nothing
producer.commitTransaction();
logger.info("Transaction committed with {} events", events.size());
} catch (ProducerFencedException | OutOfOrderSequenceException |
AuthorizationException e) {
// Fatal errors - cannot recover
logger.error("Fatal error in transaction", e);
producer.close();
throw e;
} catch (KafkaException e) {
// Abort transaction for other errors
logger.error("Error in transaction, aborting", e);
producer.abortTransaction();
throw e;
}
}
}
Producer with Custom Interceptor
package com.cloudurable.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class MetricsInterceptor<K, V> implements ProducerInterceptor<K, V> {
private static final Logger logger = LoggerFactory.getLogger(MetricsInterceptor.class);
private final AtomicLong sentCount = new AtomicLong(0);
private final AtomicLong ackCount = new AtomicLong(0);
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
sentCount.incrementAndGet();
// Add custom headers
record.headers().add("sent-time", String.valueOf(System.currentTimeMillis()).getBytes());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
ackCount.incrementAndGet();
}
// Log metrics every 1000 messages
if (sentCount.get() % 1000 == 0) {
logger.info("Producer metrics - Sent: {}, Acknowledged: {}",
sentCount.get(), ackCount.get());
}
}
@Override
public void close() {
logger.info("Final metrics - Sent: {}, Acknowledged: {}",
sentCount.get(), ackCount.get());
}
@Override
public void configure(Map<String, ?> configs) {
// Configuration if needed
}
}
Producer Pool Pattern
package com.cloudurable.kafka;
import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ProducerPool implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ProducerPool.class);
private final BlockingQueue<Producer<String, Event>> pool;
private final int poolSize;
public ProducerPool(int poolSize) {
this.poolSize = poolSize;
this.pool = new LinkedBlockingQueue<>(poolSize);
// Initialize pool
for (int i = 0; i < poolSize; i++) {
pool.offer(ModernKafkaProducer.createProducer());
}
}
public void send(Event event) throws InterruptedException {
Producer<String, Event> producer = null;
try {
// Get producer from pool
producer = pool.poll(5, TimeUnit.SECONDS);
if (producer == null) {
throw new RuntimeException("No available producer in pool");
}
ProducerRecord<String, Event> record = new ProducerRecord<>(
"events", event.userId(), event
);
producer.send(record);
} finally {
// Return producer to pool
if (producer != null) {
pool.offer(producer);
}
}
}
@Override
public void close() {
pool.forEach(producer -> {
try {
producer.close(Duration.ofSeconds(10));
} catch (Exception e) {
logger.error("Error closing producer", e);
}
});
}
}
Production Configuration Best Practices
Application Properties
# Kafka Producer Configuration
kafka.bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
kafka.client.id=event-producer
# Security
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=OAUTHBEARER
kafka.ssl.truststore.location=/var/ssl/kafka.client.truststore.jks
kafka.ssl.truststore.password=${TRUSTSTORE_PASSWORD}
# Performance
kafka.batch.size=65536
kafka.linger.ms=10
kafka.compression.type=lz4
kafka.buffer.memory=134217728
# Reliability
kafka.acks=all
kafka.enable.idempotence=true
kafka.retries=2147483647
kafka.max.in.flight.requests.per.connection=5
# Timeouts
kafka.request.timeout.ms=30000
kafka.delivery.timeout.ms=120000
Monitoring and Metrics
package com.cloudurable.kafka.monitoring;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
public class MonitoredCallback implements Callback {
private final MeterRegistry meterRegistry;
private final Timer.Sample sample;
public MonitoredCallback(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.sample = Timer.start(meterRegistry);
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
meterRegistry.counter("kafka.producer.errors",
"error", exception.getClass().getSimpleName()).increment();
} else {
sample.stop(meterRegistry.timer("kafka.producer.latency",
"topic", metadata.topic(),
"partition", String.valueOf(metadata.partition())));
meterRegistry.counter("kafka.producer.success",
"topic", metadata.topic()).increment();
}
}
}
Error Handling Patterns
public class ResilientProducer {
private static final Logger logger = LoggerFactory.getLogger(ResilientProducer.class);
public static void sendWithRetry(Producer<String, Event> producer,
Event event,
int maxRetries) {
int attempts = 0;
Exception lastException = null;
while (attempts < maxRetries) {
try {
ProducerRecord<String, Event> record = new ProducerRecord<>(
"events", event.userId(), event
);
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get(10, TimeUnit.SECONDS);
logger.info("Event sent successfully after {} attempts", attempts + 1);
return;
} catch (Exception e) {
lastException = e;
attempts++;
if (attempts < maxRetries) {
logger.warn("Failed to send event, attempt {}/{}",
attempts, maxRetries, e);
// Exponential backoff
try {
Thread.sleep((long) Math.pow(2, attempts) * 1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
logger.error("Failed to send event after {} attempts", maxRetries, lastException);
throw new RuntimeException("Failed to send event", lastException);
}
}
Testing Your Producer
package com.cloudurable.kafka;
import com.cloudurable.kafka.model.Event;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
class ProducerTest {
@Test
void testEventProduction() {
// Create mock producer
MockProducer<String, Event> mockProducer = new MockProducer<>(
true, new StringSerializer(), new JsonSerializer<>()
);
// Send event
Event event = Event.create("user123", "login", "{\"ip\": \"192.168.1.1\"}");
ProducerRecord<String, Event> record = new ProducerRecord<>(
"events", event.userId(), event
);
mockProducer.send(record);
// Verify
assertEquals(1, mockProducer.history().size());
ProducerRecord<String, Event> sent = mockProducer.history().get(0);
assertEquals("user123", sent.key());
assertEquals("login", sent.value().type());
}
}
Performance Tuning Guide
flowchart TB
subgraph Tuning["Performance Tuning"]
BATCH[Increase Batch Size<br>batch.size=64KB]
LINGER[Add Linger Time<br>linger.ms=10-20ms]
COMPRESS[Enable Compression<br>compression.type=lz4]
BUFFER[Increase Buffer<br>buffer.memory=128MB]
ASYNC[Use Async Send<br>Non-blocking]
end
subgraph Metrics["Monitor These Metrics"]
M1[Records/Second]
M2[Bytes/Second]
M3[Batch Size Avg]
M4[Compression Ratio]
M5[Request Latency]
end
Tuning --> Metrics
style Tuning fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style Metrics fill:#e8f5e9,stroke:#43a047,stroke-width:1px
Common Issues and Solutions
Issue | Solution |
---|---|
TimeoutException | Increase request.timeout.ms and delivery.timeout.ms |
BufferExhaustedException | Increase buffer.memory or reduce send rate |
SerializationException | Verify serializer configuration and data format |
AuthenticationException | Check security configs and credentials |
OutOfOrderSequenceException | Enable idempotence or handle in application |
Review Questions
Why enable idempotence?
Idempotence prevents duplicate messages during retries, ensuring exactly-once delivery semantics.
When to use transactions?
Use transactions when you need atomic writes across multiple partitions or topics.
What’s the optimal batch size?
Start with 16-64KB and tune based on your message size and throughput requirements.
How to handle producer failures?
Use callbacks for async handling, implement retry logic, and monitor error metrics.
Why use producer pools?
Producer instances are thread-safe but creating them is expensive. Pools improve performance.
Summary
Modern Kafka producers in 2025 emphasize:
- Reliability through idempotence and proper error handling
- Performance via batching, compression, and async patterns
- Observability with metrics and monitoring
- Security using mTLS and OAuth
- Cloud-native patterns for scalability
With these patterns, you can build producers that handle millions of events reliably!
Related Content
- What is Kafka?
- Kafka Architecture
- Kafka Producer Architecture
- Kafka Consumer Tutorial
- Kafka Command Line
- Schema Registry
- Kafka Streams
About Cloudurable
We hope you enjoyed this tutorial. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.
TweetApache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting