Confluent Platform 5.4-preview

Confluent Platform 5.4-preview provides Built-in Multi-Region Replication and Schema Validation.

Warning

This a preview release and is not intended for production use. You cannot upgrade from this preview release to a production release because some of the interfaces might change to improve the user experience.

Built-in Multi-Region Replication

Apache Kafka® is often run across availability zones or nearby datacenters to make disaster recovery more seamless and automated. With Confluent Server, a single cluster can span regional datacenters. Kafka operators can create replicas of data on a per-region basis, synchronously or asynchronously, at the topic level. To achieve this, three distinct pieces of functionality are required: follower-fetching, observers, and replica placement.

Follower-Fetching

Before, Confluent Platform 5.4-preview, all read and write operations took place on the leader. With 5.4-preview, clients are allowed to read from followers. This dramatically reduces the amount of cross-datacenter traffic between clients and brokers.

To enable follower fetching, configure these settings in your server.properties file, where broker.rack identifies a location for a nearby follower to read from; and client.rack is the client property. Apache Kafka 2.3 clients or later will read from followers that have matching broker.rack and client.rack IDs.

replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
broker.rack=<region>
client.rack=<rack-ID>

Tip

This feature is also available in the confluent-kafka package.

Observers

Historically there are two types of replicas: leaders and followers. Confluent Platform 5.4-preview introduces a third type of replica, observers. Observers will never join the in-sync replicas (ISR) unless instructed to, but will try to keep up with the leader just like a follower. With follower-fetching, clients can also read from observers.

By not joining the ISR, observers give operators the ability to asynchronously replicate data. Because the high-water mark is not raised until all members of the ISR acknowledge they have received a message, clients using ack=all can suffer from throughput issues, especially when high latency, low bandwidth cross DC networks are involved. With observers, you can define topics that synchronously replicate data within one region, but asynchronously between regions. These observers are not in the ISR, so they will not effect throughput and latency metrics.

You can use the CaughtUpReplicas configuration to monitor whether observers are caught up with the leader or not.

Replica Placement

Replica placement defines how to replicate data between different regions. For example, you can create a topic that uses observers with the new --replica-placement flag on kafka-topics, to configure the internal property confluent.placement.constraints.

kafka-topics  --create \
    --bootstrap-server localhost:9092 \
    --topic testing-observers \
    --partitions 1 \
    --replica-placement /etc/confluent/observer.json \
    --config min.insync.replicas=2

Here is what the observers.json file looks like:

{
    "version": 1,
    "replicas": [
        {
            "count": 3,
            "constraints": {
                "rack": "us-west"
            }
        }
    ],
    "observers": [
        {
            "count": 3,
            "constraints": {
                "rack": "us-east"
            }
        }
    ]
}

Here is what the topic will looks like when you run kafka-topics --bootstrap-server localhost:9092 --describe.

Topic: test-observer    PartitionCount: 1       ReplicationFactor: 3    Configs: segment.bytes=1073741824,confluent.placement.constraints={"version":1,"replicas":[{"count":3,"constraints":{"rack":"us-west"}}],"observers":[{"count":2,"constraints":{"rack":"us-east"}}]}
Topic: test-observer    Partition: 0    Leader: 1       Replicas: 1,2,3,4,5,6 Isr: 1,2,3      Offline:      LiveObservers: 4,5,6

In the above example, a client with ack-all will get an acknowledge back once brokers 1,2,3 have the data. Brokers 4,5,6 will also replica the data as quickly as possible with the advantage that they will not slow down producer record acknowledgements.

Automated Failover

Disaster recovery is significantly more automated when using this type of a multi-region architecture. This is made possible with observers, follower fetching, and replica placement. This means that there is no offset translation, network change, client restart, or custom code to worry about.

For example, if you have a cluster spanning us-west-1 and us-west-2, and you lose all brokers in us-west-1.

  • If a topic has ISR members in us-west=2, those brokers would automatically be elected leader and the clients would continue to produce and consume.

  • To failover to observers, follow this procedure:

    1. Create a properties file that specifies the topic partitions.

      cat unclean-election.json
      {
        "version": 1,
        "partitions": [
          {"topic": "test-observer", "partition": 0}
        ]
      }
      
    2. Run this command and the observers will join the ISR:

      kafka-leader-election --bootstrap-server kafka-west-2:9092 \
      --election-type unclean --path-to-json-file unclean-election.json
      
  • To fail back to the preferred leaders after the brokers have recovered, run this command:

    kafka-leader-election --bootstrap-server kafka-west-1:9092 \
    --election-type PREFERRED --all-topic-partitions
    

Schema Validation

Schema Validation gives operators a centralized location to enforce data format correctness at the topic level. To enable Schema Validation, set confluent.schema.registry.url in your server.properties file. For example: confluent.schema.registry.url=http://schema-registry:8081. This configuration accepts a comma-separated list of URLs for Schema Registry instances.

By default, Confluent Server uses the TopicNameStrategy to map topics with schemas in Schema Registry. This can be changed for both the key and value via confluent.key.subject.name.strategy and confluent.value.subject.name.strategy.

To enable Schema Validation on a topic, set confluent.value.schema.validation=true and confluent.key.schema.validation=true.

For example:

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 \
--partitions 1 --topic test-validation-true \
--config confluent.value.schema.validation=true

If a message is produced and does not have a valid schema for the value of the message, the client will get an error back from the client.

Install 5.4-preview

You can install these features by following the standard production installation process. However, you must replace the software links in that document with the 5.4-preview DEBs, RPMs, containers, and tarballs. Abridged installation instructions are included below.

Important

You must complete these steps for each node in your cluster.

Prerequisites
Before installing Confluent Platform, your environment must have the following software and hardware requirements.

Ubuntu and Debian

$ wget -qO - https://packages.confluent.io/deb/5.3/archive.key | sudo apt-key add -
$ sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/beta201908/deb/5.4 stable main"
$ sudo apt-get update && sudo apt-get install confluent-platform-2.12

RHEL and Centos

Important

You must complete these steps for each node in your cluster.

Prerequisites
Before installing Confluent Platform, your environment must have the following software and hardware requirements.
  1. Install the curl and which tools.

    sudo yum install curl which
    
  2. Install the Confluent Platform 5.4-preview public key. This key is used to sign packages in the YUM repository.

    sudo rpm --import https://packages.confluent.io/beta201908/rpm/5.4/archive.key
    
  3. Navigate to /etc/yum.repos.d/ and create a file named confluent.repo with these contents. This adds the Confluent repository.

    [Confluent.dist]
    name=Confluent repository (dist)
    baseurl=https://packages.confluent.io/beta201908/rpm/5.4/7
    gpgcheck=1
    gpgkey=https://packages.confluent.io/beta201908/rpm/5.4/archive.key
    enabled=1
    
    [Confluent]
    name=Confluent repository
    baseurl=https://packages.confluent.io/beta201908/rpm/5.4
    gpgcheck=1
    gpgkey=https://packages.confluent.io/beta201908/rpm/5.4/archive.key
    enabled=1
    
  4. Clear the YUM caches and install Confluent Platform.

    • Confluent Platform:

      sudo yum clean all && sudo yum install confluent-platform-2.12
      

    Tip

    The package name contains the Confluent Platform version followed by the Scala version. For example, 5.3.1-2.12.zip denotes Confluent Platform version 5.3.1 and Scala version 2.12.

Tarball

Download the CP 5.4-preview tarball here

Docker

The CP 5.4-preview containers can be found on Docker Hub

Note

Confluent Platform 5.4-preview ships with ZooKeeper 3.5. which includes SSL support, dynamic reconfiguration, and more.

Install Confluent Server

To use Schema Validation or Built-in Multi-Region Replication, install the confluent-server package, rather than confluent-kafka. Confluent Server is a component of Confluent Platform that includes Kafka and additional cloud native and commercial features, such as role-based access control (RBAC), LDAP Authorizer, support for Confluent Operator, and now Built-in Multi-Region Replication and Schema Validation. Confluent Server is fully compatible with Kafka, and you can migrate in place between Confluent Server and Kafka.

Here are examples of how to migrate to confluent-server and then back to confluent-kafka using the APT package manager.

Migrate from Kafka to Confluent Server

  1. Stop the current Kafka broker process.

    kafka-server-stop
    
  2. Uninstall the confluent-kafka package.

    apt remove confluent-kafka
    
  3. Add the 5.4-preview package repository.

    add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.4-beta stable main"
    
  4. Install Confluent Server.

    apt install confluent-server
    
  5. Start the Confluent Server.

    kafka-server-start server.properties
    

With Confluent Server installed, you can use RBAC, the LDAP Authorizer, observers, and Schema Validation.

Migrate from Confluent Server to Kafka

  1. Disable any commercial features, including RBAC and LDAP Authorization.

  2. Change all topics that use observers or Schema Validation. For example:

    kafka-configs --zookeeper localhost:2181 --entity-name test-topic \
    --entity-type topics --delete-config confluent.value.schema.validation
    
    kafka-configs --zookeeper zookeeper-west:2181 --entity-name testing-observers \
    --entity-type topics --alter --add-config confluent.placement.constraints=no-observers-here.json
    
  3. Remove any commercial components of Confluent Platform from the server.properties file, such as, confluent.observer.feature='true' and confluent.schema.registry.url.

  4. Stop the server.

    kafka-server-stop
    
  5. Uninstall Confluent Server.

    apt-remove confluent-server
    
  6. Install Kafka and start the server.

    apt install confluent-kafka && kafka-server-start server.properties
    

Known Limitations

  • Follower fetching does not work with librdkafka.
  • Partition reassignment and Confluent Auto Data Balancer have the potential to break replica placement constraints. Before running reassignment or ADB, disable all replica placement constraints, run the reassignment, and then re-apply the original placement constraints.
    1. kafka-configs --zookeeper localhost:2181 --entity-name test-observer --entity-type topics --alter --delete-config confluent.placement.constraints
    2. Wait for reassignment to complete.
    3. Run Auto Data Balancer
    4. kafka-configs.sh --zookeeper localhost:2181 --entity-name test-observer --entity-type topics --alter --replica-placement placement.json
  • A multi-region architecture should be run with a single ZooKeeper cluster that spans three or more regions. This requirement is necessary to break split brain scenarios.
  • The error returned by Confluent Server when Schema Validation is enabled is too general.