Kafka Tutorial: Creating Advanced Kafka Producers in Java - 2025 Edition

January 9, 2025

                                                                           

🚀 What’s New in This 2025 Update

Major Updates and Changes

  • Idempotence by Default - No more accidental duplicates
  • Mature Transactions - Production-ready exactly-once semantics
  • KRaft Mode - No ZooKeeper, simplified operations
  • Cloud-Native Patterns - Producer pools and serverless ready
  • Enhanced Monitoring - Built-in observability and tracing
  • Schema Validation - Contract enforcement at producer level

Producer Evolution Since 2017

  • ✅ Guaranteed Delivery - Idempotence prevents duplicates
  • ✅ Atomic Operations - Multi-partition transactions
  • ✅ Better Performance - Optimized batching and compression
  • ✅ Cloud-Ready - Kubernetes and serverless patterns

Welcome to the advanced Kafka Producer tutorial for 2025! This comprehensive guide covers cutting-edge producer patterns that have become essential for production deployments.

Prerequisites

Before you start:

Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Advanced Producer Architecture

flowchart TB
  subgraph Producer["Advanced Kafka Producer"]
    APP[Application]
    VALID[Schema Validator]
    PART[Custom Partitioner]
    INT[Interceptors]
    BATCH[Smart Batching]
    COMP[Compression]
    IDEM[Idempotence Layer]
    TX[Transaction Manager]
  end
  
  subgraph Kafka["Kafka Cluster"]
    B1[Broker 1<br>Partition Leader]
    B2[Broker 2<br>Follower]
    B3[Broker 3<br>Follower]
  end
  
  APP --> VALID
  VALID --> PART
  PART --> INT
  INT --> BATCH
  BATCH --> COMP
  COMP --> IDEM
  IDEM --> TX
  TX --> B1
  B1 --> B2
  B1 --> B3
  
  style Producer fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
  style Kafka fill:#e8f5e9,stroke:#43a047,stroke-width:1px

Understanding Producer Acknowledgments (Acks)

The acks configuration is crucial for balancing durability and performance:

Acks Setting Description Use Case
acks=0 No acknowledgment Maximum throughput, log aggregation
acks=1 Leader acknowledgment Balance of performance and reliability
acks=all All ISR acknowledgment Maximum durability, critical data

Modern Producer Configuration

package com.cloudurable.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.UUID;

public class AdvancedStockPriceProducer {
    private static final Logger logger = LoggerFactory.getLogger(AdvancedStockPriceProducer.class);
    
    private static Producer<String, StockPrice> createProducer() {
        Properties props = new Properties();
        
        // Connection settings
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "advanced-producer-" + UUID.randomUUID());
        
        // Serialization
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StockPriceSerializer.class.getName());
        
        // Durability - Maximum with idempotence
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        // Performance optimizations
        setupBatchingAndCompression(props);
        setupRetriesAndTimeouts(props);
        
        // Interceptors
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
                  MetricsProducerInterceptor.class.getName());
        
        return new KafkaProducer<>(props);
    }
    
    private static void setupBatchingAndCompression(Properties props) {
        // Batching for throughput
        props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32 * 1024); // 32KB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024); // 64MB
        
        // Compression for efficiency
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
    }
    
    private static void setupRetriesAndTimeouts(Properties props) {
        // Retries handled by idempotence
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        
        // Timeouts
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
    }
}

Enhanced Stock Price Model

package com.cloudurable.kafka.producer.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Instant;
import java.util.UUID;

public class StockPrice {
    @JsonProperty("id")
    private final String id;
    
    @JsonProperty("symbol")
    private final String symbol;
    
    @JsonProperty("price")
    private final double price;
    
    @JsonProperty("volume")
    private final long volume;
    
    @JsonProperty("timestamp")
    private final Instant timestamp;
    
    @JsonProperty("exchange")
    private final String exchange;
    
    private static final ObjectMapper mapper = new ObjectMapper();
    
    public StockPrice(String symbol, double price, long volume, String exchange) {
        this.id = UUID.randomUUID().toString();
        this.symbol = symbol;
        this.price = price;
        this.volume = volume;
        this.timestamp = Instant.now();
        this.exchange = exchange;
    }
    
    public String toJson() {
        try {
            return mapper.writeValueAsString(this);
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize StockPrice", e);
        }
    }
    
    // Getters...
}

Custom Serializer with Schema Validation

package com.cloudurable.kafka.producer;

import com.cloudurable.kafka.producer.model.StockPrice;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.networknt.schema.JsonSchema;
import com.networknt.schema.JsonSchemaFactory;
import com.networknt.schema.ValidationMessage;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;

public class ValidatingStockPriceSerializer implements Serializer<StockPrice> {
    private static final Logger logger = LoggerFactory.getLogger(ValidatingStockPriceSerializer.class);
    
    private final ObjectMapper objectMapper = new ObjectMapper();
    private JsonSchema schema;
    
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Load schema for validation
        String schemaJson = """
            {
                "$schema": "http://json-schema.org/draft-07/schema#",
                "type": "object",
                "required": ["id", "symbol", "price", "volume", "timestamp", "exchange"],
                "properties": {
                    "id": {"type": "string"},
                    "symbol": {"type": "string", "pattern": "^[A-Z]{1,5}$"},
                    "price": {"type": "number", "minimum": 0},
                    "volume": {"type": "integer", "minimum": 0},
                    "timestamp": {"type": "string"},
                    "exchange": {"type": "string", "enum": ["NYSE", "NASDAQ", "LSE"]}
                }
            }
            """;
        
        JsonSchemaFactory factory = JsonSchemaFactory.getInstance();
        schema = factory.getSchema(schemaJson);
    }
    
    @Override
    public byte[] serialize(String topic, StockPrice data) {
        if (data == null) {
            return null;
        }
        
        try {
            String json = data.toJson();
            
            // Validate against schema
            JsonNode jsonNode = objectMapper.readTree(json);
            Set<ValidationMessage> errors = schema.validate(jsonNode);
            
            if (!errors.isEmpty()) {
                logger.error("Schema validation failed: {}", errors);
                throw new SerializationException("Schema validation failed: " + errors);
            }
            
            return json.getBytes(StandardCharsets.UTF_8);
            
        } catch (Exception e) {
            throw new SerializationException("Error serializing StockPrice", e);
        }
    }
    
    @Override
    public void close() {
        // Cleanup if needed
    }
}

Custom Partitioner for Load Distribution

package com.cloudurable.kafka.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class StockExchangePartitioner implements Partitioner {
    private static final Logger logger = LoggerFactory.getLogger(StockExchangePartitioner.class);
    
    private final Map<String, AtomicInteger> exchangeCounters = new ConcurrentHashMap<>();
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, 
                        Object value, byte[] valueBytes, Cluster cluster) {
        
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        if (key == null || !(value instanceof StockPrice)) {
            // Round-robin for null keys
            return exchangeCounters.computeIfAbsent("default", k -> new AtomicInteger(0))
                                 .getAndIncrement() % numPartitions;
        }
        
        StockPrice stockPrice = (StockPrice) value;
        String exchange = stockPrice.getExchange();
        
        // Assign partitions based on exchange for locality
        return switch (exchange) {
            case "NYSE" -> hashToPartition("NYSE", numPartitions, 0, numPartitions / 3);
            case "NASDAQ" -> hashToPartition("NASDAQ", numPartitions, numPartitions / 3, 2 * numPartitions / 3);
            case "LSE" -> hashToPartition("LSE", numPartitions, 2 * numPartitions / 3, numPartitions);
            default -> Math.abs(key.hashCode()) % numPartitions;
        };
    }
    
    private int hashToPartition(String exchange, int totalPartitions, int start, int end) {
        int range = end - start;
        AtomicInteger counter = exchangeCounters.computeIfAbsent(exchange, k -> new AtomicInteger(0));
        return start + (counter.getAndIncrement() % range);
    }
    
    @Override
    public void close() {
        logger.info("Closing partitioner");
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
        // Configuration if needed
    }
}

Producer Interceptor for Monitoring

package com.cloudurable.kafka.producer;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

public class MetricsProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(MetricsProducerInterceptor.class);
    
    private final MeterRegistry meterRegistry = new SimpleMeterRegistry();
    private final Map<String, AtomicLong> topicCounters = new ConcurrentHashMap<>();
    private final Map<String, Timer.Sample> inFlightRecords = new ConcurrentHashMap<>();
    
    @Override
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        // Track send attempt
        String topic = record.topic();
        topicCounters.computeIfAbsent(topic, k -> new AtomicLong(0)).incrementAndGet();
        
        // Start latency timer
        String recordId = topic + "-" + System.nanoTime();
        inFlightRecords.put(recordId, Timer.start(meterRegistry));
        
        // Add tracing headers
        record.headers()
            .add("trace-id", recordId.getBytes())
            .add("send-time", String.valueOf(System.currentTimeMillis()).getBytes());
        
        return record;
    }
    
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (metadata != null) {
            // Success metrics
            meterRegistry.counter("producer.records.sent", 
                                "topic", metadata.topic(),
                                "partition", String.valueOf(metadata.partition()))
                        .increment();
            
            // Record size
            meterRegistry.summary("producer.record.size.bytes", "topic", metadata.topic())
                        .record(metadata.serializedValueSize());
            
        } else if (exception != null) {
            // Error metrics
            meterRegistry.counter("producer.errors",
                                "exception", exception.getClass().getSimpleName())
                        .increment();
            logger.error("Failed to send record", exception);
        }
        
        // Log periodically
        long totalSent = topicCounters.values().stream()
                                     .mapToLong(AtomicLong::get)
                                     .sum();
        if (totalSent % 1000 == 0) {
            logger.info("Total records sent: {}", totalSent);
        }
    }
    
    @Override
    public void close() {
        logger.info("Closing interceptor - Total records by topic: {}", topicCounters);
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
        // Configuration if needed
    }
}

Transactional Producer for Exactly-Once Semantics

package com.cloudurable.kafka.producer;

import com.cloudurable.kafka.producer.model.StockPrice;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Properties;

public class TransactionalStockProducer {
    private static final Logger logger = LoggerFactory.getLogger(TransactionalStockProducer.class);
    
    private final Producer<String, StockPrice> producer;
    private final String transactionalId;
    
    public TransactionalStockProducer(String transactionalId) {
        this.transactionalId = transactionalId;
        this.producer = createTransactionalProducer();
    }
    
    private Producer<String, StockPrice> createTransactionalProducer() {
        Properties props = new Properties();
        
        // Standard configuration
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StockPriceSerializer.class.getName());
        
        // Transactional configuration
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // Required for transactions
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // Required for transactions
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        
        // Performance
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        
        Producer<String, StockPrice> producer = new KafkaProducer<>(props);
        
        // Initialize transactions
        producer.initTransactions();
        
        return producer;
    }
    
    public void sendTransactionalBatch(List<StockPrice> stockPrices) {
        try {
            producer.beginTransaction();
            
            for (StockPrice stockPrice : stockPrices) {
                ProducerRecord<String, StockPrice> record = new ProducerRecord<>(
                    "stock-prices",
                    null, // Let partitioner decide
                    stockPrice.getTimestamp().toEpochMilli(),
                    stockPrice.getSymbol(),
                    stockPrice
                );
                
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception != null) {
                            logger.error("Error in transaction for {}", stockPrice.getSymbol(), exception);
                        } else {
                            logger.debug("Sent {} to partition {} offset {}",
                                       stockPrice.getSymbol(), 
                                       metadata.partition(), 
                                       metadata.offset());
                        }
                    }
                });
            }
            
            // Commit the transaction
            producer.commitTransaction();
            logger.info("Transaction committed with {} records", stockPrices.size());
            
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            // Fatal errors - cannot recover
            logger.error("Fatal error in transaction", e);
            producer.close();
            throw e;
        } catch (KafkaException e) {
            // Abort transaction for other errors
            logger.error("Error in transaction, aborting", e);
            producer.abortTransaction();
            throw e;
        }
    }
    
    public void close() {
        producer.close();
    }
}

Producer Pool for High Concurrency

package com.cloudurable.kafka.producer;

import com.cloudurable.kafka.producer.model.StockPrice;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class StockProducerPool implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(StockProducerPool.class);
    
    private final BlockingQueue<Producer<String, StockPrice>> pool;
    private final ExecutorService executor;
    private final AtomicInteger activeProducers = new AtomicInteger(0);
    private final int maxPoolSize;
    
    public StockProducerPool(int poolSize) {
        this.maxPoolSize = poolSize;
        this.pool = new LinkedBlockingQueue<>(poolSize);
        this.executor = Executors.newFixedThreadPool(poolSize * 2);
        
        // Initialize pool
        for (int i = 0; i < poolSize; i++) {
            pool.offer(AdvancedStockPriceProducer.createProducer());
        }
        
        logger.info("Initialized producer pool with {} producers", poolSize);
    }
    
    public CompletableFuture<RecordMetadata> sendAsync(StockPrice stockPrice) {
        return CompletableFuture.supplyAsync(() -> {
            Producer<String, StockPrice> producer = null;
            try {
                producer = borrowProducer();
                
                ProducerRecord<String, StockPrice> record = new ProducerRecord<>(
                    "stock-prices",
                    stockPrice.getSymbol(),
                    stockPrice
                );
                
                Future<RecordMetadata> future = producer.send(record);
                return future.get(10, TimeUnit.SECONDS);
                
            } catch (Exception e) {
                logger.error("Failed to send stock price", e);
                throw new CompletionException(e);
            } finally {
                if (producer != null) {
                    returnProducer(producer);
                }
            }
        }, executor);
    }
    
    private Producer<String, StockPrice> borrowProducer() throws InterruptedException {
        Producer<String, StockPrice> producer = pool.poll(5, TimeUnit.SECONDS);
        if (producer == null) {
            throw new RuntimeException("No available producer in pool");
        }
        activeProducers.incrementAndGet();
        return producer;
    }
    
    private void returnProducer(Producer<String, StockPrice> producer) {
        activeProducers.decrementAndGet();
        pool.offer(producer);
    }
    
    public void sendBatch(List<StockPrice> stockPrices) {
        List<CompletableFuture<RecordMetadata>> futures = stockPrices.stream()
            .map(this::sendAsync)
            .toList();
        
        // Wait for all to complete
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenRun(() -> logger.info("Batch of {} records sent successfully", stockPrices.size()))
            .exceptionally(throwable -> {
                logger.error("Error sending batch", throwable);
                return null;
            });
    }
    
    @Override
    public void close() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
        
        // Close all producers
        pool.forEach(producer -> {
            try {
                producer.close(Duration.ofSeconds(10));
            } catch (Exception e) {
                logger.error("Error closing producer", e);
            }
        });
        
        logger.info("Producer pool closed");
    }
}

Cloud-Native Producer Pattern

package com.cloudurable.kafka.producer;

import com.cloudurable.kafka.producer.model.StockPrice;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.util.Config;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class CloudNativeProducer {
    private static final Logger logger = LoggerFactory.getLogger(CloudNativeProducer.class);
    
    public static Producer<String, StockPrice> createCloudNativeProducer() {
        Properties props = new Properties();
        
        // Get configuration from environment/Kubernetes
        String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
        if (bootstrapServers == null) {
            bootstrapServers = getBootstrapServersFromKubernetes();
        }
        
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        
        // Security from Kubernetes secrets
        configureSecurityFromSecrets(props);
        
        // Cloud-optimized settings
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB for cloud networks
        props.put(ProducerConfig.LINGER_MS_CONFIG, 50); // Higher for cloud latency
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 128 * 1024 * 1024); // 128MB
        
        // Retries for cloud resilience
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 500);
        
        return new KafkaProducer<>(props);
    }
    
    private static String getBootstrapServersFromKubernetes() {
        try {
            ApiClient client = Config.defaultClient();
            Configuration.setDefaultApiClient(client);
            
            // Get Kafka service endpoint from Kubernetes
            // This is simplified - real implementation would use K8s API
            return "kafka-bootstrap.kafka.svc.cluster.local:9092";
            
        } catch (Exception e) {
            logger.warn("Failed to get Kafka endpoint from Kubernetes", e);
            return "localhost:9092";
        }
    }
    
    private static void configureSecurityFromSecrets(Properties props) {
        // In real implementation, read from Kubernetes secrets
        String securityProtocol = System.getenv("KAFKA_SECURITY_PROTOCOL");
        if ("SASL_SSL".equals(securityProtocol)) {
            props.put("security.protocol", "SASL_SSL");
            props.put("sasl.mechanism", System.getenv("KAFKA_SASL_MECHANISM"));
            props.put("sasl.jaas.config", System.getenv("KAFKA_SASL_JAAS_CONFIG"));
            props.put("ssl.truststore.location", "/var/run/secrets/kafka/truststore.jks");
            props.put("ssl.truststore.password", System.getenv("KAFKA_SSL_TRUSTSTORE_PASSWORD"));
        }
    }
}

Testing Advanced Producers

package com.cloudurable.kafka.producer;

import com.cloudurable.kafka.producer.model.StockPrice;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.junit.jupiter.api.Assertions.*;

class AdvancedProducerTest {
    
    @Test
    void testCustomPartitioner() {
        MockProducer<String, StockPrice> mockProducer = new MockProducer<>(
            true, 
            new StringSerializer(), 
            new StockPriceSerializer()
        );
        
        StockExchangePartitioner partitioner = new StockExchangePartitioner();
        
        // Test NYSE stocks go to first third of partitions
        StockPrice nyseStock = new StockPrice("AAPL", 150.0, 1000000, "NYSE");
        int partition = partitioner.partition("stock-prices", "AAPL", null, nyseStock, null, null);
        assertTrue(partition < 3); // Assuming 9 partitions total
        
        // Test round-robin within exchange
        StockPrice nyseStock2 = new StockPrice("MSFT", 300.0, 2000000, "NYSE");
        int partition2 = partitioner.partition("stock-prices", "MSFT", null, nyseStock2, null, null);
        assertNotEquals(partition, partition2);
    }
    
    @Test
    void testTransactionalProducer() {
        TransactionalStockProducer producer = new TransactionalStockProducer("test-tx-id");
        
        List<StockPrice> batch = List.of(
            new StockPrice("AAPL", 150.0, 1000000, "NYSE"),
            new StockPrice("GOOGL", 2800.0, 500000, "NASDAQ"),
            new StockPrice("MSFT", 300.0, 1500000, "NYSE")
        );
        
        assertDoesNotThrow(() -> producer.sendTransactionalBatch(batch));
    }
}

Performance Tuning Guide

flowchart TB
  subgraph Optimization["Producer Optimization"]
    BATCH[Increase Batch Size<br>batch.size=64KB]
    LINGER[Add Linger Time<br>linger.ms=20-50ms]
    COMPRESS[Use Compression<br>compression.type=lz4]
    BUFFER[Increase Buffer<br>buffer.memory=128MB]
    IDEM[Enable Idempotence<br>enable.idempotence=true]
    POOL[Use Producer Pools<br>For high concurrency]
  end
  
  subgraph Metrics["Key Metrics"]
    M1[Records/Second]
    M2[Bytes/Second]
    M3[Batch Size Avg]
    M4[Compression Ratio]
    M5[Request Latency]
    M6[Error Rate]
  end
  
  Optimization --> Metrics
  
  style Optimization fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
  style Metrics fill:#e8f5e9,stroke:#43a047,stroke-width:1px

Best Practices Summary

Feature Best Practice
Idempotence Always enable for critical data
Transactions Use for exactly-once across topics
Compression LZ4 for balance of speed and ratio
Batching Tune based on throughput needs
Partitioning Custom for data locality
Monitoring Use interceptors for metrics
Error Handling Implement circuit breakers
Cloud Deployment Use producer pools and env config

Review Questions

Why is idempotence important?

Idempotence prevents duplicate messages during retries, ensuring exactly-once delivery semantics even with network failures.

When should you use transactions?

Use transactions when you need atomic writes across multiple partitions or topics, especially for financial or critical data.

How do custom partitioners help?

Custom partitioners enable data locality, better load distribution, and can improve consumer performance by co-locating related data.

What’s the benefit of producer pools?

Producer pools reduce connection overhead and improve throughput in high-concurrency scenarios, especially in serverless environments.

How does schema validation help?

Schema validation at the producer level prevents bad data from entering Kafka, ensuring data quality and contract compliance.

Summary

Advanced Kafka producers in 2025 offer:

  • Guaranteed Delivery through idempotence and transactions
  • High Performance via optimized batching and compression
  • Data Quality with schema validation and interceptors
  • Cloud-Native patterns for modern deployments
  • Production-Ready configurations and monitoring

Master these patterns to build reliable, high-performance data pipelines!

About Cloudurable

We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.

                                                                           
comments powered by Disqus

Apache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting