Confluent Platform 5.4-preview¶
Confluent Platform 5.4-preview provides Built-in Multi-Region Replication and Schema Validation.
Warning
This a preview release and is not intended for production use. You cannot upgrade from this preview release to a production release because some of the interfaces might change to improve the user experience.
Built-in Multi-Region Replication¶
Apache Kafka® is often run across availability zones or nearby datacenters to make disaster recovery more seamless and automated. With Confluent Server, a single cluster can span regional datacenters. Kafka operators can create replicas of data on a per-region basis, synchronously or asynchronously, at the topic level. To achieve this, three distinct pieces of functionality are required: follower-fetching, observers, and replica placement.
Follower-Fetching¶
Before, Confluent Platform 5.4-preview, all read and write operations took place on the leader. With 5.4-preview, clients are allowed to read from followers. This dramatically reduces the amount of cross-datacenter traffic between clients and brokers.
To enable follower fetching, configure these settings in your server.properties
file, where broker.rack
identifies a location for a nearby follower to read from; and client.rack
is the client property. Apache Kafka 2.3
clients or later will read from followers that have matching broker.rack
and client.rack
IDs.
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
broker.rack=<region>
client.rack=<rack-ID>
Tip
This feature is also available in the confluent-kafka
package.
Observers¶
Historically there are two types of replicas: leaders and followers. Confluent Platform 5.4-preview introduces a third type of replica, observers. Observers will never join the in-sync replicas (ISR) unless instructed to, but will try to keep up with the leader just like a follower. With follower-fetching, clients can also read from observers.
By not joining the ISR, observers give operators the ability to asynchronously replicate data. Because the high-water mark
is not raised until all members of the ISR acknowledge they have received a message, clients using ack=all
can suffer
from throughput issues, especially when high latency, low bandwidth cross DC networks are involved. With observers, you can
define topics that synchronously replicate data within one region, but asynchronously between regions. These observers are
not in the ISR, so they will not effect throughput and latency metrics.
You can use the CaughtUpReplicas
configuration to monitor whether observers are caught up with the leader or not.
Replica Placement¶
Replica placement defines how to replicate data between different regions. For example, you can create a topic that uses
observers with the new --replica-placement
flag on kafka-topics
, to configure the internal property
confluent.placement.constraints
.
kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic testing-observers \
--partitions 1 \
--replica-placement /etc/confluent/observer.json \
--config min.insync.replicas=2
Here is what the observers.json
file looks like:
{
"version": 1,
"replicas": [
{
"count": 3,
"constraints": {
"rack": "us-west"
}
}
],
"observers": [
{
"count": 3,
"constraints": {
"rack": "us-east"
}
}
]
}
Here is what the topic will looks like when you run kafka-topics --bootstrap-server localhost:9092 --describe
.
Topic: test-observer PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824,confluent.placement.constraints={"version":1,"replicas":[{"count":3,"constraints":{"rack":"us-west"}}],"observers":[{"count":2,"constraints":{"rack":"us-east"}}]}
Topic: test-observer Partition: 0 Leader: 1 Replicas: 1,2,3,4,5,6 Isr: 1,2,3 Offline: LiveObservers: 4,5,6
In the above example, a client with ack-all
will get an acknowledge back once brokers 1,2,3 have the data. Brokers
4,5,6 will also replica the data as quickly as possible with the advantage that they will not slow down producer record
acknowledgements.
Automated Failover¶
Disaster recovery is significantly more automated when using this type of a multi-region architecture. This is made possible with observers, follower fetching, and replica placement. This means that there is no offset translation, network change, client restart, or custom code to worry about.
For example, if you have a cluster spanning us-west-1
and us-west-2
, and you lose all brokers in us-west-1
.
If a topic has ISR members in
us-west=2
, those brokers would automatically be elected leader and the clients would continue to produce and consume.To failover to observers, follow this procedure:
Create a properties file that specifies the topic partitions.
cat unclean-election.json { "version": 1, "partitions": [ {"topic": "test-observer", "partition": 0} ] }
Run this command and the observers will join the ISR:
kafka-leader-election --bootstrap-server kafka-west-2:9092 \ --election-type unclean --path-to-json-file unclean-election.json
To fail back to the preferred leaders after the brokers have recovered, run this command:
kafka-leader-election --bootstrap-server kafka-west-1:9092 \ --election-type PREFERRED --all-topic-partitions
Schema Validation¶
Schema Validation gives operators a centralized location to enforce data format correctness at the topic level. To
enable Schema Validation, set confluent.schema.registry.url
in your server.properties
file. For example: confluent.schema.registry.url=http://schema-registry:8081
.
This configuration accepts a comma-separated list of URLs for Schema Registry instances.
By default, Confluent Server uses the TopicNameStrategy
to map topics with schemas in Schema Registry. This can be changed for both the key
and value via confluent.key.subject.name.strategy
and confluent.value.subject.name.strategy
.
To enable Schema Validation on a topic, set confluent.value.schema.validation=true
and
confluent.key.schema.validation=true
.
For example:
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 \
--partitions 1 --topic test-validation-true \
--config confluent.value.schema.validation=true
If a message is produced and does not have a valid schema for the value of the message, the client will get an error back from the client.
Install 5.4-preview¶
You can install these features by following the standard production installation process. However, you must replace the software links in that document with the 5.4-preview DEBs, RPMs, containers, and tarballs. Abridged installation instructions are included below.
Important
You must complete these steps for each node in your cluster.
- Prerequisites
- Before installing Confluent Platform, your environment must have the following software and hardware requirements.
Ubuntu and Debian¶
$ wget -qO - https://packages.confluent.io/deb/5.3/archive.key | sudo apt-key add -
$ sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/beta201908/deb/5.4 stable main"
$ sudo apt-get update && sudo apt-get install confluent-platform-2.12
RHEL and Centos¶
Important
You must complete these steps for each node in your cluster.
- Prerequisites
- Before installing Confluent Platform, your environment must have the following software and hardware requirements.
Install the
curl
andwhich
tools.sudo yum install curl which
Install the Confluent Platform 5.4-preview public key. This key is used to sign packages in the YUM repository.
sudo rpm --import https://packages.confluent.io/beta201908/rpm/5.4/archive.key
Navigate to
/etc/yum.repos.d/
and create a file namedconfluent.repo
with these contents. This adds the Confluent repository.[Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/beta201908/rpm/5.4/7 gpgcheck=1 gpgkey=https://packages.confluent.io/beta201908/rpm/5.4/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/beta201908/rpm/5.4 gpgcheck=1 gpgkey=https://packages.confluent.io/beta201908/rpm/5.4/archive.key enabled=1
Clear the YUM caches and install Confluent Platform.
Confluent Platform:
sudo yum clean all && sudo yum install confluent-platform-2.12
Tip
The package name contains the Confluent Platform version followed by the Scala version. For example,
5.3.0-2.12.zip
denotes Confluent Platform version 5.3.0 and Scala version 2.12.
Docker¶
The CP 5.4-preview containers can be found on Docker Hub
Note
Confluent Platform 5.4-preview ships with ZooKeeper 3.5. which includes SSL support, dynamic reconfiguration, and more.
Install Confluent Server¶
To use Schema Validation or Built-in Multi-Region Replication, install the confluent-server
package, rather than confluent-kafka
. Confluent Server is a
component of Confluent Platform that includes Kafka and additional cloud native and commercial features, such as
role-based access control (RBAC), LDAP Authorizer, support for
Confluent Operator, and now Built-in Multi-Region Replication and Schema Validation. Confluent Server is fully compatible with Kafka, and
you can migrate in place between Confluent Server and Kafka.
Here are examples of how to migrate to confluent-server
and then back to confluent-kafka
using the APT package
manager.
Migrate from Kafka to Confluent Server¶
Stop the current Kafka broker process.
kafka-server-stop
Uninstall the
confluent-kafka
package.apt remove confluent-kafka
Add the 5.4-preview package repository.
add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.4-beta stable main"
Install Confluent Server.
apt install confluent-server
Start the Confluent Server.
kafka-server-start server.properties
With Confluent Server installed, you can use RBAC, the LDAP Authorizer, observers, and Schema Validation.
Migrate from Confluent Server to Kafka¶
Disable any commercial features, including RBAC and LDAP Authorization.
Change all topics that use observers or Schema Validation. For example:
kafka-configs --zookeeper localhost:2181 --entity-name test-topic \ --entity-type topics --delete-config confluent.value.schema.validation
kafka-configs --zookeeper zookeeper-west:2181 --entity-name testing-observers \ --entity-type topics --alter --add-config confluent.placement.constraints=no-observers-here.json
Remove any commercial components of Confluent Platform from the
server.properties
file, such as,confluent.observer.feature='true'
andconfluent.schema.registry.url
.Stop the server.
kafka-server-stop
Uninstall Confluent Server.
apt-remove confluent-server
Install Kafka and start the server.
apt install confluent-kafka && kafka-server-start server.properties
Known Limitations¶
- Follower fetching does not work with librdkafka.
- Partition reassignment and Confluent Auto Data Balancer have the potential to break replica placement constraints. Before running
reassignment or ADB, disable all replica placement constraints, run the reassignment, and then
re-apply the original placement constraints.
kafka-configs --zookeeper localhost:2181 --entity-name test-observer --entity-type topics --alter --delete-config confluent.placement.constraints
- Wait for reassignment to complete.
- Run Auto Data Balancer
kafka-configs.sh --zookeeper localhost:2181 --entity-name test-observer --entity-type topics --alter --replica-placement placement.json
- A multi-region architecture should be run with a single ZooKeeper cluster that spans three or more regions. This requirement is necessary to break split brain scenarios.
- The error returned by Confluent Server when Schema Validation is enabled is too general.