January 9, 2025
🚀 What’s New in This 2025 Update
Major Updates and Changes
- Multi-Format Support - Avro, JSON Schema, and Protobuf
- Schema References - Compose complex, modular schemas
- Cloud-Native Integration - Managed registry services
- Enhanced Security - ACLs, encryption, audit trails
- Data Contracts - Enforce governance at scale
- Performance - 10x faster schema lookups with caching
New Features Since 2017
- ✅ Multiple Schema Formats - Beyond just Avro
- ✅ Schema Linking - Reference common schemas
- ✅ Async Compatibility Checks - Non-blocking validation
- ✅ Schema Normalization - Automatic formatting
- ✅ Metadata Tags - Business context for schemas
Ready to master data governance in Kafka? Let’s explore how Schema Registry has evolved into the cornerstone of streaming data quality.
Kafka Tutorial: Kafka, Avro Serialization and the Schema Registry
mindmap
root((Schema Registry 2025))
Core Features
Multi-Format Support
Schema Evolution
Compatibility Modes
REST API
Data Formats
Apache Avro
JSON Schema
Protocol Buffers
Custom Formats
Operations
Register Schemas
Version Control
Compatibility Checks
Schema Lookup
Integration
Kafka Producers
Kafka Consumers
Stream Processing
CDC Pipelines
Governance
Data Contracts
Access Control
Audit Logging
Compliance
Schema Registry has evolved from a simple Avro schema store to a comprehensive data governance platform for streaming architectures. It now supports multiple serialization formats, enables complex schema compositions, and integrates seamlessly with cloud-native deployments.
The modern Schema Registry provides a centralized repository for schemas with versioning, compatibility checking, and evolution support. This ensures data quality and consistency across your entire streaming infrastructure.
Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
Why Schema Registry?
The Data Quality Challenge
flowchart LR
subgraph Without["Without Schema Registry"]
P1[Producer v1] -->|Binary Data| K1[Kafka]
P2[Producer v2] -->|Different Format| K1
K1 -->|???| C1[Consumer]
C1 -->|Errors!| X[Failed Processing]
end
subgraph With["With Schema Registry"]
SR[(Schema Registry)]
P3[Producer v1] -->|Schema ID + Data| K2[Kafka]
P4[Producer v2] -->|Compatible Schema| K2
P3 -.->|Register Schema| SR
P4 -.->|Check Compatibility| SR
K2 -->|Schema ID| C2[Consumer]
C2 -.->|Fetch Schema| SR
C2 -->|Success| Y[Processed Data]
end
style Without fill:#ffebee,stroke:#e53935,stroke-width:2px
style With fill:#e8f5e9,stroke:#43a047,stroke-width:2px
Key Benefits in 2025
-
Multi-Format Support
- Not just Avro anymore
- JSON Schema for REST APIs
- Protobuf for gRPC services
- Custom serialization formats
-
Schema Evolution
- Safe, backward-compatible changes
- Automated compatibility validation
- Zero-downtime deployments
-
Performance
- Schemas sent once, not with every message
- Client-side caching
- Compressed storage
-
Data Governance
- Centralized schema management
- Access control and audit logs
- Data lineage tracking
Schema Registry Architecture
Modern Deployment Architecture
flowchart TB
subgraph Producers["Producer Applications"]
PA[Service A<br>Avro]
PB[Service B<br>JSON Schema]
PC[Service C<br>Protobuf]
end
subgraph Registry["Schema Registry Cluster"]
SR1[Registry Node 1]
SR2[Registry Node 2]
SR3[Registry Node 3]
STORE[(Schema Storage<br>Kafka _schemas)]
CACHE[(In-Memory Cache)]
end
subgraph Consumers["Consumer Applications"]
CA[Analytics<br>Multi-Format]
CB[ML Pipeline<br>Avro]
CC[API Gateway<br>JSON]
end
PA -->|Register v1| SR1
PB -->|Register v2| SR2
PC -->|Check Compatibility| SR3
SR1 <--> STORE
SR2 <--> STORE
SR3 <--> STORE
STORE --> CACHE
CA -->|Fetch Schema| CACHE
CB -->|Fetch Schema| CACHE
CC -->|Fetch Schema| CACHE
classDef producer fill:#bbdefb,stroke:#1976d2,stroke-width:1px,color:#333333
classDef registry fill:#e1bee7,stroke:#8e24aa,stroke-width:2px,color:#333333
classDef consumer fill:#c8e6c9,stroke:#43a047,stroke-width:1px,color:#333333
class PA,PB,PC producer
class SR1,SR2,SR3,STORE,CACHE registry
class CA,CB,CC consumer
Architecture Flow:
- Producers register schemas before sending data
- Registry validates and stores schemas in Kafka
- Producers receive schema IDs for serialization
- Consumers fetch schemas using IDs
- Schemas are cached for performance
Schema Compatibility Settings
Compatibility Modes Explained
flowchart TB
subgraph Compatibility["Schema Compatibility Modes"]
BACKWARD[BACKWARD<br>New schema reads old data]
FORWARD[FORWARD<br>Old schema reads new data]
FULL[FULL<br>Bidirectional compatibility]
NONE[NONE<br>No validation]
end
subgraph Examples["Real-World Examples"]
B_EX[Add optional field<br>Remove field with default]
F_EX[Add field with default<br>Remove optional field]
FULL_EX[Only safe changes<br>Most restrictive]
NONE_EX[Any changes allowed<br>Use with caution]
end
BACKWARD --> B_EX
FORWARD --> F_EX
FULL --> FULL_EX
NONE --> NONE_EX
style BACKWARD fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style FORWARD fill:#e8f5e9,stroke:#43a047,stroke-width:2px
style FULL fill:#fff9c4,stroke:#f9a825,stroke-width:2px
style NONE fill:#ffebee,stroke:#e53935,stroke-width:2px
Compatibility Rules
Mode | Can Add Field | Can Remove Field | Use Case |
---|---|---|---|
BACKWARD | With default | Yes | Consumer updates first |
FORWARD | Yes | With default | Producer updates first |
FULL | With default | With default | Maximum safety |
BACKWARD_TRANSITIVE | With default | Yes | Check all versions |
FORWARD_TRANSITIVE | Yes | With default | Check all versions |
FULL_TRANSITIVE | With default | With default | Check all versions |
Schema Evolution in Practice
Safe Evolution Example
stateDiagram-v2
[*] --> V1: Initial Schema
V1 --> V2: Add optional field
V2 --> V3: Add field with default
V3 --> V4: Remove optional field
V4 --> V5: Rename via alias
note right of V1
{
"name": "User",
"fields": [
{"name": "id", "type": "string"},
{"name": "email", "type": "string"}
]
}
end note
note right of V2
Added: {"name": "age", "type": ["null", "int"]}
end note
note right of V3
Added: {"name": "created", "type": "long", "default": 0}
end note
Employee Schema Evolution Example
// Version 1 - Original schema
{
"namespace": "com.cloudurable.phonebook",
"type": "record",
"name": "Employee",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "email", "type": "string"}
]
}
// Version 2 - Add optional field (BACKWARD compatible)
{
"namespace": "com.cloudurable.phonebook",
"type": "record",
"name": "Employee",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "email", "type": "string"},
{"name": "age", "type": ["null", "int"], "default": null}
]
}
// Version 3 - Add required field with default (BACKWARD compatible)
{
"namespace": "com.cloudurable.phonebook",
"type": "record",
"name": "Employee",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "email", "type": "string"},
{"name": "age", "type": ["null", "int"], "default": null},
{"name": "department", "type": "string", "default": "Unknown"},
{"name": "startDate", "type": "long", "default": 0}
]
}
Schema References (New in 2025)
// Shared address schema
{
"namespace": "com.cloudurable.common",
"type": "record",
"name": "Address",
"fields": [
{"name": "street", "type": "string"},
{"name": "city", "type": "string"},
{"name": "state", "type": "string"},
{"name": "zip", "type": "string"}
]
}
// Employee schema with reference
{
"namespace": "com.cloudurable.phonebook",
"type": "record",
"name": "Employee",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "email", "type": "string"},
{"name": "addresses", "type": {
"type": "array",
"items": "com.cloudurable.common.Address"
}}
],
"references": [
{
"name": "com.cloudurable.common.Address",
"subject": "address-value",
"version": 1
}
]
}
Using Schema Registry REST API
Core API Operations
sequenceDiagram
participant Client
participant Registry
participant Kafka
Note over Client,Kafka: Schema Registration Flow
Client->>Registry: POST /subjects/users-value/versions
Registry->>Registry: Validate Schema
Registry->>Kafka: Store in _schemas
Registry-->>Client: {"id": 42}
Note over Client,Kafka: Schema Retrieval Flow
Client->>Registry: GET /schemas/ids/42
Registry->>Registry: Check Cache
Registry-->>Client: Return Schema
Note over Client,Kafka: Compatibility Check
Client->>Registry: POST /compatibility/subjects/users-value/versions/latest
Registry->>Registry: Check Compatibility
Registry-->>Client: {"is_compatible": true}
Modern REST API Examples
Register a Schema
# Register Avro schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{
"schemaType": "AVRO",
"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"}]}"
}' \
http://localhost:8081/subjects/users-value/versions
# Register JSON Schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{
"schemaType": "JSON",
"schema": "{\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"}},\"required\":[\"id\",\"email\"]}"
}' \
http://localhost:8081/subjects/events-value/versions
# Register Protobuf schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{
"schemaType": "PROTOBUF",
"schema": "syntax = \"proto3\"; message User { string id = 1; string email = 2; }"
}' \
http://localhost:8081/subjects/users-proto-value/versions
Query Operations
# List all subjects
curl http://localhost:8081/subjects
# Get latest version
curl http://localhost:8081/subjects/users-value/versions/latest
# Get specific version
curl http://localhost:8081/subjects/users-value/versions/1
# Get schema by ID
curl http://localhost:8081/schemas/ids/42
# Check compatibility
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{...}"}' \
http://localhost:8081/compatibility/subjects/users-value/versions/latest
# Get global compatibility
curl http://localhost:8081/config
# Set subject compatibility
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config/users-value
Kafka Producer with Schema Registry
Modern Producer Implementation
package com.cloudurable.kafka.schema;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import com.cloudurable.phonebook.Employee;
import com.cloudurable.phonebook.Address;
import java.util.Properties;
import java.util.Arrays;
public class ModernAvroProducer {
private static Producer<String, Employee> createProducer() {
Properties props = new Properties();
// Kafka Configuration
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "employee-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class.getName());
// Schema Registry Configuration
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:8081");
// Performance Optimizations
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
// Schema Registry Features
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, true);
props.put(KafkaAvroSerializerConfig.USE_LATEST_VERSION, true);
return new KafkaProducer<>(props);
}
public static void main(String[] args) {
Producer<String, Employee> producer = createProducer();
try {
// Create employee with address (using schema reference)
Address homeAddress = Address.newBuilder()
.setStreet("123 Main St")
.setCity("San Francisco")
.setState("CA")
.setZip("94105")
.build();
Employee employee = Employee.newBuilder()
.setFirstName("Jane")
.setLastName("Smith")
.setEmail("jane.smith@example.com")
.setAge(28)
.setDepartment("Engineering")
.setStartDate(System.currentTimeMillis())
.setAddresses(Arrays.asList(homeAddress))
.build();
// Send with callback
ProducerRecord<String, Employee> record =
new ProducerRecord<>("employees", employee.getEmail(), employee);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Failed to send: " + exception);
} else {
System.out.printf("Sent to partition %d at offset %d%n",
metadata.partition(), metadata.offset());
}
});
} finally {
producer.flush();
producer.close();
}
}
}
Producer Best Practices
flowchart TB
subgraph Producer["Producer Best Practices"]
REG[Auto-register schemas]
CACHE[Enable schema caching]
COMPRESS[Use compression]
BATCH[Batch messages]
RETRY[Configure retries]
end
subgraph Config["Key Configurations"]
C1["auto.register.schemas=true"]
C2["use.latest.version=true"]
C3["compression.type=snappy"]
C4["batch.size=16384"]
C5["retries=3"]
end
REG --> C1
CACHE --> C2
COMPRESS --> C3
BATCH --> C4
RETRY --> C5
style Producer fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style Config fill:#e8f5e9,stroke:#43a047,stroke-width:1px
Kafka Consumer with Schema Registry
Modern Consumer Implementation
package com.cloudurable.kafka.schema;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.cloudurable.phonebook.Employee;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ModernAvroConsumer {
private static Consumer<String, Employee> createConsumer() {
Properties props = new Properties();
// Kafka Configuration
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "employee-processor");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getName());
// Schema Registry Configuration
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:8081");
// Use Specific Records (not GenericRecord)
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
// Schema Evolution Support
props.put(KafkaAvroDeserializerConfig.USE_LATEST_VERSION, false);
// Performance Settings
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
return new KafkaConsumer<>(props);
}
public static void main(String[] args) {
Consumer<String, Employee> consumer = createConsumer();
consumer.subscribe(Collections.singletonList("employees"));
try {
while (true) {
ConsumerRecords<String, Employee> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Employee> record : records) {
Employee employee = record.value();
// Process employee with schema evolution support
System.out.printf("Employee: %s %s (%s)%n",
employee.getFirstName(),
employee.getLastName(),
employee.getEmail());
// New fields might not exist in old records
if (employee.hasAge()) {
System.out.printf(" Age: %d%n", employee.getAge());
}
if (employee.hasDepartment()) {
System.out.printf(" Department: %s%n",
employee.getDepartment());
}
// Process addresses if present
if (employee.getAddresses() != null) {
employee.getAddresses().forEach(addr ->
System.out.printf(" Address: %s, %s%n",
addr.getStreet(), addr.getCity())
);
}
}
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}
Cloud-Native Schema Registry
Deployment Options
flowchart TB
subgraph Managed["Managed Services"]
CC[Confluent Cloud<br>Schema Registry]
MSK[AWS MSK<br>Schema Registry]
AEH[Azure Event Hubs<br>Schema Registry]
end
subgraph Self["Self-Managed"]
K8S[Kubernetes<br>Helm Charts]
DOCKER[Docker<br>Compose]
VM[Virtual Machines<br>Ansible]
end
subgraph Features["Enterprise Features"]
HA[High Availability]
SEC[Security/ACLs]
MON[Monitoring]
BACKUP[Backup/Restore]
end
Managed --> Features
Self --> Features
classDef managed fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
classDef self fill:#fff9c4,stroke:#f9a825,stroke-width:2px
classDef feature fill:#e8f5e9,stroke:#43a047,stroke-width:1px
class CC,MSK,AEH managed
class K8S,DOCKER,VM self
class HA,SEC,MON,BACKUP feature
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: schema-registry
spec:
replicas: 3
selector:
matchLabels:
app: schema-registry
template:
metadata:
labels:
app: schema-registry
spec:
containers:
- name: schema-registry
image: confluentinc/cp-schema-registry:7.5.0
ports:
- containerPort: 8081
env:
- name: SCHEMA_REGISTRY_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: SCHEMA_REGISTRY_LISTENERS
value: "http://0.0.0.0:8081"
- name: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
value: "kafka-0.kafka:9092,kafka-1.kafka:9092,kafka-2.kafka:9092"
- name: SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL
value: "SASL_SSL"
- name: SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM
value: "PLAIN"
- name: SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG
valueFrom:
secretKeyRef:
name: schema-registry-secrets
key: sasl.jaas.config
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /
port: 8081
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /subjects
port: 8081
initialDelaySeconds: 30
periodSeconds: 10
Best Practices for 2025
1. Schema Design
- Start with strict schemas
- Use meaningful field names
- Document with “doc” fields
- Plan for evolution upfront
2. Evolution Strategy
- Always use BACKWARD compatibility
- Add fields with defaults
- Never change field types
- Use aliases for renames
3. Performance
- Enable client-side caching
- Use latest schema versions
- Batch schema lookups
- Monitor cache hit rates
4. Operations
- Automate compatibility checks
- Version control schemas
- Monitor registry health
- Regular backups
5. Security
- Enable authentication
- Use ACLs for authorization
- Encrypt sensitive fields
- Audit schema changes
Monitoring Schema Registry
flowchart TB
subgraph Metrics["Key Metrics to Monitor"]
M1[Request Rate]
M2[Request Latency]
M3[Schema Count]
M4[Compatibility Failures]
M5[Cache Hit Rate]
end
subgraph Alerts["Critical Alerts"]
A1[Registry Unavailable]
A2[High Latency > 100ms]
A3[Compatibility Errors]
A4[Storage Full]
end
subgraph Tools["Monitoring Tools"]
PROM[Prometheus]
GRAF[Grafana]
DD[DataDog]
CW[CloudWatch]
end
Metrics --> Tools
Tools --> Alerts
style Metrics fill:#e3f2fd,stroke:#1976d2,stroke-width:1px
style Alerts fill:#ffebee,stroke:#e53935,stroke-width:1px
style Tools fill:#e8f5e9,stroke:#43a047,stroke-width:1px
Troubleshooting Common Issues
Schema Not Found
# Check if schema exists
curl http://localhost:8081/subjects
# Verify schema ID
curl http://localhost:8081/schemas/ids/<id>
# Check producer configuration
# Ensure auto.register.schemas=true
Compatibility Errors
# Check current compatibility
curl http://localhost:8081/config/<subject>
# Test compatibility before registering
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "..."}' \
http://localhost:8081/compatibility/subjects/<subject>/versions/latest
Performance Issues
# Check cache statistics
curl http://localhost:8081/metrics
# Increase cache size
export SCHEMA_REGISTRY_CACHE_SIZE=1000
# Enable response caching
export SCHEMA_REGISTRY_RESPONSE_CACHE_ENABLED=true
Conclusion
Schema Registry has evolved into an essential component of modern Kafka deployments. With support for multiple formats, cloud-native integrations, and advanced governance features, it ensures data quality and compatibility at scale.
Key takeaways for 2025:
- Multi-format support enables diverse use cases
- Schema references allow modular design
- Cloud-native deployments reduce operational burden
- Strong governance ensures data quality
- Performance optimizations handle massive scale
Whether you’re building event-driven microservices, real-time analytics, or ML pipelines, Schema Registry provides the foundation for reliable data exchange.
Related Content
- What is Kafka?
- Kafka Architecture
- Kafka Topic Architecture
- Kafka Consumer Architecture
- Kafka Producer Architecture
- Kafka Low Level Design
- Kafka Log Compaction
- Kafka Ecosystem
- Kafka vs. JMS
- Kafka versus Kinesis
- Kafka Command Line Tutorial
- Kafka Producer Tutorial
- Kafka Consumer Tutorial
- Avro Introduction
About Cloudurable
Master data governance in your Kafka deployments. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.
Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.
TweetApache Spark Training
Kafka Tutorial
Akka Consulting
Cassandra Training
AWS Cassandra Database Support
Kafka Support Pricing
Cassandra Database Support Pricing
Non-stop Cassandra
Watchdog
Advantages of using Cloudurable™
Cassandra Consulting
Cloudurable™| Guide to AWS Cassandra Deploy
Cloudurable™| AWS Cassandra Guidelines and Notes
Free guide to deploying Cassandra on AWS
Kafka Training
Kafka Consulting
DynamoDB Training
DynamoDB Consulting
Kinesis Training
Kinesis Consulting
Kafka Tutorial PDF
Kubernetes Security Training
Redis Consulting
Redis Training
ElasticSearch / ELK Consulting
ElasticSearch Training
InfluxDB/TICK Training TICK Consulting