Multi-Region Clusters

Confluent Server is often run across availability zones or nearby datacenters. If the computer network between brokers across availability zones or nearby datacenters is dissimilar, in term of reliability, latency, bandwidth, or cost, this can result in higher latency, lower throughput and increased cost to produce and consume messages.

To mitigate this, three distinct pieces of functionality were added to Confluent Server:

  • Follower-Fetching
  • Observers
  • Replica Placement

Follower-Fetching

Before the introduction of this feature, all consume and produce operations took place on the leader. With Multi-Region Clusters, clients are allowed to consume from followers. This dramatically reduces the amount of cross-datacenter traffic between clients and brokers.

To enable follower fetching, configure these settings in your server.properties file, where broker.rack identifies location of the broker. It doesn’t have to be rack, but can be region in which broker resides:

replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
broker.rack=<region>

On the consumer side, set client.rack as the client property. Apache Kafka® 2.3 clients or later will then read from followers that have matching broker.rack as the specified client.rack ID.

client.rack=<rack-ID>

Tip

This feature is also available in the confluent-kafka package.

Observers

Historically there are two types of replicas: leaders and followers. Multi-Region Clusters introduces a third type of replica, observers. By default, observers will not join the in-sync replicas (ISR) but will try to keep up with the leader just like a follower. With follower-fetching, clients can also consume from observers.

By not joining the ISR, observers give operators the ability to asynchronously replicate data. In Confluent Server the high watermark for a topic partition is not increased until all members of the ISR acknowledge they have replicated a message. Clients using acks=all can suffer from throughput issues, especially when high latency, low bandwidth networks across datacenters are involved. With observers, you can define topics that synchronously replicate data within one region, but replicate the data asynchronously between regions. By default, these observers do not join the ISR, so they do not affect the throughput and latency of precluding messages since the topic partition leader doesn’t need to wait for them to get replicated to the observers before acknowledging the request back to the producer.

You can use the metrics described in the metric section of this document to monitor the number of replicas (normal synchronous replicas and observers) that are caught up with the leader.

Replica Placement

Replica placement defines how to assign replicas to the partitions in a topic. This feature relies on the broker.rack property configured for each broker. For example, you can create a topic that uses observers with the new --replica-placement flag on kafka-topics to configure the internal property confluent.placement.constraints.

kafka-topics  --create \
    --zookeeper localhost:2181 \
    --topic testing-observers \
    --partitions 3 \
    --replica-placement /etc/confluent/testing-observers.json \
    --config min.insync.replicas=2

Where the /etc/confluent/testing-observers.json file contains:

{
    "version": 1,
    "replicas": [
        {
            "count": 3,
            "constraints": {
                "rack": "us-west"
            }
        }
    ],
    "observers": [
        {
            "count": 2,
            "constraints": {
                "rack": "us-east"
            }
        }
    ]
}

The field replicas contains a list of constraints that must be satisfied by the sync replicas. The field observers contains a list of constraints that must be satisfied by the async replicas (observers). In the example above, Confluent Server will create one topic with three partitions. Each partition will be assigned five replicas. Three of the replicas will be sync replicas with a broker.rack equal to us-west while two of the replicas will be observers with a broker.rack equal to us-east. If the constraint cannot be satisfied and Confluent Server fails to find enough brokers matching specified constraint, topic creation will fail.

Here is what the topic will look like when you run kafka-topics --zookeeper localhost:2181 --describe:

Topic: test-observers    PartitionCount: 3       ReplicationFactor: 5    Configs: segment.bytes=1073741824,confluent.placement.constraints={"version":1,"replicas":[{"count":3,"constraints":{"rack":"us-west"}}],"observers":[{"count":2,"constraints":{"rack":"us-east"}}]}
Topic: test-observers    Partition: 0    Leader: 1       Replicas: 1,2,3,4,5 Isr: 1,2,3      Offline:      Observers: 4,5
Topic: test-observers    Partition: 1    Leader: 2       Replicas: 2,3,1,5,4 Isr: 2,3,1      Offline:      Observers: 5,4
Topic: test-observers    Partition: 2    Leader: 3       Replicas: 3,1,2,4,5 Isr: 3,1,2      Offline:      Observers: 4,5

In the above example, for first partition, a producer with acks=all will get an acknowledgement back from the topic partition leader 1 after brokers 1, 2 and 3 have replicated the produced message. Brokers 4 and 5 will also replicate the data as quickly as possible but the leader can send the producer an acknowledgement without waiting for an acknowledgement from brokers 4 and 5.

Note

Replica placement contraints cannot overlap. For example, the following replica placement JSON is invalid.

    # INVALID
    "version": 1,
    "replicas": [
        {
            "count": 2,
            "constraints": {
                "rack": "us-west"
            }
        },
        {
            "count": 1,
            "constraints": {
                "rack": "us-east"
            }
        }
    ],
    "observers": [
        {
            "count": 1,
            "constraints": {
                "rack": "us-east"
            }
        }
    ]
}

However, these replica-placement constraints are valid:

    # VALID
    "version": 1,
    "replicas": [
        {
            "count": 2,
            "constraints": {
                "rack": "us-west"
            }
        },
        {
            "count": 1,
            "constraints": {
                "rack": "us-east-a"
            }
        }
    ],
    "observers": [
        {
            "count": 1,
            "constraints": {
                "rack": "us-east-b"
            }
        }
    ]
}

Architecture

As a best practice, deploy multi-region clusters across three or more data centers to avoid split-brain in the event of a network partition event. A ZooKeeper deployment might look like this:

  • DC1: Two ZooKeeper nodes
  • DC2: Two ZooKeeper nodes
  • DC3: One ZooKeeper node

Note that Kafka brokers do not necessarily need to be deployed in each data center (DC). The ZooKeeper ensemble should be deployed so that if a network partition event occurs, a quorum of ZooKeeper nodes remains. This is easiest in a three or more datacenter multi-region cluster.

A two datacenter deployment is possible, but this architecture will require either a preferred datacenter to win all leader elections (for example, three ZooKeeper nodes, two ZooKeeper nodes) in the event of a network partition, or manual intervention to reconfigure the ZooKeeper quorum to elect the winning datacenter.

Observer Failover

In the event that all of the sync replicas are offline, it is possible to elect an observer as leader. Confluent Server includes a command (kafka-leader-election) to manually trigger leader election. This command can be used to send a rerequest to the controller to elect an online replica, including an observer, as a leader even if they are not part of the ISR. Electing a replica or observer as leader when it is not in ISR is called “unclean leader election”.

In an unclean leader election, it is possible for the new leader to not have all of the produced records up to the largest offset that was acknowledged. This can result in the truncation of all the topic partition logs to an offset that is before the largest acknowledged offset.

Important

To minimize possible data loss caused by unclean leader election, monitor observer replication to make sure that it is not falling too far behind.

For example, if you have a cluster spanning us-west-1 and us-west-2, and you lose all brokers in us-west-1:

  • If a topic has replicas in us-west-2 in the ISR, those brokers would automatically be elected leader and the clients would continue to produce and consume.

  • If a topic has replicas, including observers, in us-west-2 not in the ISR, the user can perform unclean leader election:

    1. Create a properties file that specifies the topic partitions:

      cat unclean-election.json
      {
        "version": 1,
        "partitions": [
          {"topic": "testing-observers", "partition": 0}
        ]
      }
      
    2. Run this command and the observers will join the ISR:

      kafka-leader-election --bootstrap-server kafka-west-2:9092 \
      --election-type UNCLEAN --path-to-json-file unclean-election.json
      
  • To fail back to the preferred leaders after the brokers have recovered, run this command:

    kafka-leader-election --bootstrap-server kafka-west-1:9092 \
    --election-type preferred --all-topic-partitions
    

    A switch to the preferred leader happens automatically when auto.leader.rebalance.enable is set. The selection of the preferred leader is also subject to leader.imbalance.per.broker.percentage and leader.imbalance.check.interval.seconds. To prompt the selection of preferred leaders, run:

    kafka-leader-election --bootstrap-server localhost:9092 --election-type PREFERRED --topic foo
    

Metrics

In Confluent Server there are a few metrics that should be monitored for determining the health and state of a topic partition. Some to these metrics are enumerated below:

  • ReplicasCount - In JMX the full object name is kafka.cluster:type=Partition,name=ReplicasCount,topic=<topic-name>,partition=<partition-id>. It reports the number of replicas (sync replicas and observers) assigned to the topic partition.
  • ObserverReplicasCount - In JMX the full object name is kafka.cluster:type=Partition,name=ObserverReplicasCount,topic=<topic-name>,partition=<partition-id>. It reports the number of observers assigned to the topic partition.
  • InSyncReplicasCount - In JMX the full object name is kafka.cluster:type=Partition,name=InSyncReplicasCount,topic=<topic-name>,partition=<partition-id>. It reports the number of replicas in the ISR.
  • CaughtUpReplicasCount - In JMX the full object name is kafka.cluster:type=Partition,name=CaughtUpReplicasCount,topic=<topic-name>,partition=<partition-id>. It reports the number of replicas that are consider caught up to the topic partition leader. Note that this may be greater than the size of the ISR as observers may be caught up but are not part of ISR.
  • IsNotCaughtUp - In JMX the full object name is kafka.cluster:type=Partition,name=IsNotCaughtUp,topic=<topic-name>,partition=<partition-id>. It reports 1 (true) if not all replicas are considered caught up to the partition leader.

Partition Reassignment

You can change the replica placement constraints of a topic and the assignment of replicas to the partitions of a topic. You can use the kafka-configs command line tool to change the replica placement constraints. For example:

kafka-configs --zookeeper zookeeper:2181 --entity-name testing-observers --entity-type topics --alter --replica-placement /etc/confluent/testing-observers.json

Where the /etc/confluent/testing-observers.json file contains:

{
    "version": 1,
    "replicas": [
        {
            "count": 3,
            "constraints": {
                "rack": "us-west"
            }
        }
    ],
    "observers": [
        {
            "count": 2,
            "constraints": {
                "rack": "us-east"
            }
        }
    ]
}

See the Replica Placement section for a description of the content of the replica placement JSON file.

Important

  • Changing the configuration of a topic does not change the replica assignment for the topic partition. Changing the replica placement of a topic configuration must be followed by a partition reassignment. The confluent-rebalancer command line tool supports reassignment that also accounts for replica placement constraints. To learn more, see Auto Data Balancing.
  • Use confluent-rebalancer version 5.4.0 or newer.

For example, run this command to start a reassignment that matches the topic’s replica placement constraints:

confluent-rebalancer execute --bootstrap-server kafka-west-1:9092 --metrics-bootstrap-server broker-west-1:19091 --zookeeper zookeeper-west:2181 --throttle 10000000 --verbose

Run this command to monitor the status for the reassignment:

confluent-rebalancer status --zookeeper zookeeper-west:2181

Run this command to finish the reassignment:

confluent-rebalancer finish --zookeeper zookeeper-west:2181

Make sure you use confluent-rebalancer 5.4.0 or newer (packaged with Confluent Platform 5.4.0 or newer).

Monitoring Replicas

You can use the command kafka-replica-status.sh to monitor the status of replicas assigned to a partition, including information about their current mode and replication state.

For example, the following displays information about all replicas that constitute the testing-observers topic for the first partition:

kafka-replica-status.sh --bootstrap-server localhost:9092 --topics testing-observers --partitions 0 --verbose

The output for the example is:

$ ./bin/kafka-replica-status.sh --bootstrap-server localhost:9092 --topics testing-observers --partitions 0 --verbose
Topic: testing-observers
Partition: 0
Replica: 1
IsLeader: true
IsObserver: false
IsIsrEligible: true
IsInIsr: true
IsCaughtUp: true
LastCaughtUpLagMs: 0
LastFetchLagMs: 0
LogStartOffset: 0
LogEndOffset: 10000

Topic: testing-observers
Partition: 0
Replica: 2
...

The output provided is:

  • Topic (<String>): The topic for the replica.
  • Partition (<Integer>): The partition for the replica.
  • Replica (<Integer>): The broker ID for the replica.
  • isLeader (<Boolean>): Whether the replica is the ISR leader.
  • isObserver (<Boolean>): Whether the replica is an observer, otherwise a traditional replica.
  • isIsrEligible (<Boolean>): Whether the replica is a candidate to be in the ISR set.
  • isInIsr (<Boolean>): Whether the replica is in the ISR set.
  • isCaughtUp (<Boolean>): Whether the replica’s log is sufficiently caught up to the leader such that it’s considered to be in sync. However, note that a replica being caught up doesn’t necessarily mean the replica is in the ISR set. For example, the replica may be an observer, or a follower that cannot be included in the ISR due to topic placement constraints.
  • lastFetchLagMs (<Long>): The duration, in milliseconds, since the last fetch request was received from the replica. This will always be ‘0’ for the leader, and may be ‘-1’ if the leader hasn’t received a fetch request from the replica.
  • lastCaughtUpLagMs (<Long>): The duration, in milliseconds, since the last fetch request was received from the replica in which it was considered “caught up”. This will always be ‘0’ for the leader, and may be ‘-1’ if the leader hasn’t received a fetch request from the replica.
  • logStartOffset/logEndOffset: (<Long>): The starting/ending log offset for the replica’s log from the leader’s perspective. These may be ‘-1’ if the leader hasn’t received a fetch request from the replica.

Additional useful flags are:

  • --topics: Comma-separated topics to retrieve replica status for.
  • --partitions: Comma-separated list of partition IDs or ID ranges for the topic(s); for example, ‘5,10-20’.
  • --verbose: Print output in a verbose manner with one attribute per line.
  • --json: Print output in JSON format.
  • --leaders: Show only partition leaders, or omit leaders if ‘exclude’ is provided.
  • --observers: Show only observer replicas, or omit observers if ‘exclude’ is provided.
  • --exclude-internal: Excludes internal topics from the output.
  • --version: Display Confluent Server version.

Example

To try a detailed Multi-Region Clusters example, run the end-to-end Multi-Region demo. The demo injects latency and packet loss to simulate the distances between the regions, and the included playbook showcases the value of the new capabilities in a multi-region environment.

../_images/multi-region-topic-replicas.png

Notes

Operator tasks are not supported.