Manage Mirror Topics for Cluster Linking on Confluent Cloud

This page provides a concept guide, walkthroughs, and examples for creating, configuring, and managing mirror topics on cluster links.

What are mirror topics?

Mirror topics are the building blocks for moving data with Cluster Linking. They are read-only topics that are created and owned by a cluster link.

The sections below provide a conceptual overview of mirror topics, how they are created, configured, and how they work in operation, that is applicable to both Confluent Platform and Confluent Cloud.

Overview

A cluster link connects a mirror topic to its source topic. Any messages produced to the source topic are mirrored over the cluster link to the mirror topic.

A mirror topic syncs many of its configurations from its source topic. It can also sync ACLs and consumer group offsets from its source topic, if you enable those features on the cluster link.

You can convert a mirror topic to a regular topic and stop the mirroring relationship using the Cluster Linking promote and failover commands. To restore mirroring after a failover or a promote, you can run truncate-and-restore on the original source topic to make it mirror from the newly stopped mirror topic.

You can reverse the mirroring relationship with the reverse-and-start and reverse-and-pause commands, which cause the mirror topic to become the source topic, and the source topic to become the mirror topic.

The diagram below shows how mirror topics work, including the relationship between the mirror topic and its source topic, and the syncing of ACLs and consumer offsets.

../../_images/cluster-link-mirror-topics-example.png

Mirror Topic Fundamentals

Properties

Mirror topics have these unique properties:

  • Mirror topics are created by and owned by a cluster link.
  • Mirror topics get their messages from their source topic. They are byte-for-byte, offset-preserving asynchronous copies of their source topics.
  • Mirror topics are read-only; you can consume them the same as any other topic, but you cannot produce into them. If a producer tries to produce a message into a mirror topic, the action will fail. The only way to get a message into a mirror topic is to produce the message to the mirror topic’s source topic.
  • Many of the mirror topic’s configurations are copied and synced from the source topic. A full list is at the end of this page.

Mirror topic creation

You can create a mirror topic using the Confluent Cloud Console, the Confluent Cloud REST API, the Confluent CLI, the Confluent Platform AdminClient API, or Confluent for Kubernetes.

Alternatively, you can configure your cluster link to automatically create mirror topics that match certain prefixes.

Requirements

Mirror topics can only be created with the mirror topic command or by enabling auto-create mirror topics on the cluster link. A mirror topic cannot be pre-created by a non mirror topic command.

  1. Creating a mirror topic requires an existing cluster link. The mirror topic will be created on the destination cluster of the cluster link. The user must have access to the destination cluster.
  2. A mirror topic is always created with the same name as its source topic. There must be a topic of that name on the source cluster. The only exception is when a cluster link has link.prefix configured, which will add a prefix to the name of the mirror topic.
  3. The destination cluster must be able to reach the source cluster and verify there is a suitable topic. A mirror topic cannot be created if the source cluster is unreachable.
  4. Creating a mirror topic on Confluent Cloud requires the user to have the CloudClusterAdmin, EnvironmentAdmin, or OrgAdmin role over the destination cluster–that is, the cluster the mirror topic will be created on. Alternatively, the user can have the appropriate ACLs.
  5. The cluster link’s principal must have both DeveloperRead and DeveloperManage on the relevant source topic on the source cluster. Alternatively, it could have ResourceOwner or the appropriate ACLs on that topic. The user does not need any permissions on the source cluster.

Tip

Create a mirror topic on Confluent Cloud Console

If the destination cluster is a Confluent Cloud cluster, you can view and create mirror topics on the Confluent Cloud Console:

  1. On the top-level page showing your Environments, click Cluster links tab.

  2. Click on an existing cluster link, or create a new one. (If you choose to create a new cluster link, follow the prompts as given.)

  3. To add a mirror topic to an existing cluster link, click Add mirror topic.

    • If the source cluster is a Confluent Cloud cluster that the current user has access to, the dialog will have a dropdown with a list of all of the source topics on it.
    • If the source cluster is external to Confluent Cloud or the user does not have access to it, then you will see a text box instead in which to add the name of the source topic.

    Enter the source topic name and click the Add to create the mirror topic.

Tip

For cluster links with link.prefix configured, enter the name of the source topic in this dialog. The mirror topic’s name will automatically have the prefix added after you click the Add.

../../_images/cluster-link-mirror-topics-add-1.png
../../_images/cluster-link-mirror-topics-add-2.png
../../_images/cluster-link-mirror-topics-add-3.png

Create a mirror topic with the Confluent CLI

To create a mirror topic with the Confluent CLI, the general syntax is:

confluent kafka mirror create <mirror-topic-name> --link <link-name>

The command must be run against the destination cluster. If needed you can specify the destination cluster with --cluster <destination-cluster-id>. To learn more, see confluent kafka mirror create in the command reference.

If the cluster link is configured with link.prefix, then --source-topic source-topic-name must be passed, too. For example:

confluent kafka mirror create west.clicks --link from-west --source-topic clicks

On Confluent Platform clusters, you can use either the Confluent CLI or the bin/kafka-mirrors script. The general syntax to create a mirror topic is:

kafka-mirrors --create --mirror-topic <topic-name> \
--link <link-name> \
--bootstrap-server <host:port>

To learn more, see Cluster Linking on Confluent Platform.

Create a mirror topic with the REST API

On Confluent Cloud:

  • To create a mirror topic, send a POST request to the destination cluster’s REST API endpoint at: /kafka/v3/clusters/{cluster_id}/links/{link_name}/mirrors.

  • Include the following in the payload:

    {
      "source_topic_name": "<source-topic-name>",
      "mirror_topic_name": "<mirror-topic-name>", // (only required if link.prefix is configured)
    }
    

    The above shows the only required parameters. More options are available to override topic configurations.

    To learn more, see the Create a mirror topic in the Confluent Cloud API reference .

Examples

For examples of how to create mirror topics on Confluent Platform, see Create the cluster link and mirror topic (step 2, “Initialize the mirror topic”) in the basic tutorial and Creating a mirror topic in the Commands documentation.

For examples of how to create mirror topics on Confluent Cloud, see the following sections:

Create a mirror topic with the AdminClient API

On Confluent Platform, you can use the AdminClient API to create mirror topics. To learn more, see ConfluentAdmin API reference.

You can create new mirror topics with the NewTopic Admin client method. The mirror is an optional parameter of the method when creating the new topic.

public NewTopic newMirrorTopic(String linkName, String mirrorTopic) {
  NewTopic newTopic = new NewTopic(mirrorTopic, Optional.empty(), Optional.of(replicationFactor));
  newTopic.mirror(Optional.of(new NewMirrorTopic(linkName, mirrorTopic)));
  return newTopic;
}

For the promote/failover operation, see alterMirrors , which takes in an AlterMirrorOp (such as AlterMirrorOp.PROMOTE, AlterMirrorOp.FAILOVER, and so forth).

Following is a simple example of promoting a topic:

Map alterOpMap = new HashMap<>();
alterOpMap.put("topic-to-failover", AlterMirrorOp.PROMOTE);

AlterMirrorsResult alterOpResult = adminClient.alterMirrors(alterOpMap, null);
System.out.println(alterOpResult.all().get());

Additional requirements when prefixing is enabled

When a cluster link has a prefix set, the specified prefix will be added to the beginning of mirror topic names. For example; if you set the prefix to west, the source topic orders will be mirrored as west.orders.

If the cluster link is configured for prefixing mirror topic names, then to create a mirror topic you must pass both the mirror topic name and the source topic name (instead of just the source topic name).

To learn more about prefixing, see Prefix Mirror Topics and Consumer Group Names.

Bidirectional cluster linking

To establish bidirectional linking between two clusters, you must use two cluster links. You cannot establish bi-directional linking with a single cluster link. For an example of bidirectional linking, see the Hybrid tutorial (on either Confluent Cloud or Confluent Platform), which sets up bidirectional linking between on-premises and cloud clusters.

Bidirectional linking is supported for different topics. For a specific topic, only unidirectional linking is supported.

Cherry pick which topics to mirror

To cherry pick topics to be mirrored, you can use any of the following methods:

Support for compacted topics

Cluster Linking supports compacted topics. A compacted topic is mirrored as such from source to destination. To learn more, see the FAQs for Confluent Cloud and Confluent Platform.

Auto-Create mirror topics

A cluster link is able to automatically create mirror topics on the destination cluster for any topics that exist on the source cluster. This is called “auto-creating” mirror topics. This saves time and effort, because you do not have to create mirror topics by hand. This functionality can be scoped down to a specific set of topics by matching on the topics’ names.

Enable auto-create mirror topics

To enable this functionality, you must set two properties on the cluster link. You can set these properties when a cluster link is created, or update an existing cluster link with these properties. These properties are:

auto.create.mirror.topics.enable

Whether or not to auto-create mirror topics based on topics on the source cluster. When set to “true”, mirror topics will be auto-created. Setting this option to “false” disables mirror topic creation and clears any existing filters.

  • Type: boolean
  • Default: false
auto.create.mirror.topics.filters
  • A JSON object with one property, topicFilters, that contains an array of filters to apply to indicate which topics should be mirrored. Filters are described below.
  • This list must have at least one filter.
  • Ordering of the filters in this array does not matter.
  • Type: array
  • Default: empty

Syntax

{ “topicFilters”: [ <each filter to apply> ] }

Schedule and frequency of mirror topic auto-create task

The auto-topic creation schedule and frequency is driven by metadata.max.age.ms, which controls the delay between subsequent auto mirror tasks.

metadata.max.age.ms

Maximum amount of time in milliseconds that the client can use a cached metadata value before it is refreshed from the brokers.

  • Type: integer
  • Default: 5 minutes (300000 milliseconds)

The default value for metadata.max.age.ms is 5 minutes (300000 milliseconds). Using the default, clients will cache metadata about the cluster for up to 5 minutes before they refresh it.

Reducing the value specified for metadata.max.age.ms will increase the frequency with which mirror topics are auto-created.

If the metadata.max.age.ms value is too low, it can cause a refresh of the metadata too frequently, which can impact the performance of the auto-create mirror topic feature. In this case, you may see errors or delays in the creation of mirror topics.

On the other hand, if the metadata.max.age.ms value is too high, it can result in stale metadata, which can lead to inconsistencies between the source and mirror topics.

Filters for auto-create mirror topics

In Confluent Cloud, auto-creating mirror topics automatically filters out Confluent internal topics and the topic that holds schemas (default name _schemas).

In Confluent Platform, internal topics may be filtered out in upcoming releases, but in current releases, they are not. All other filtering options described below are available in both Confluent Cloud and current releases of Confluent Platform. The topic __consumer_timestamps internal topic is used by Confluent Replicator for consumer offset translation; this topic should not be mirrored. Therefore, you must filter this topic out using the auto-create mirror topics EXCLUDE filters, as described below.

Other topics can be excluded using filters. For example, if a different topic name is used for Schema Registry storage, instead of _schemas, it can be excluded by using filters. Further detail on how to filter topics for auto-create mirror topics is provided below.

You can select exactly which source topics to automatically mirror through a list of filters. There is no limit to the number of filters you can add on a cluster link.

Each filter is a JSON object with the following fields:

name
Text that will be matched against the name of the topic. Set name to the wildcard, *, to apply to all topics.
patternType

Either LITERAL or PREFIXED

  • If name is set to foo, then setting patternType to LITERAL will only match a topic named foo.
  • Setting patternType to PREFIXED will match any topic names that begin with “foo”, for example, “foo”, “football”, and “foo.fighters”.
filterType

Either INCLUDE or EXCLUDE

  • If filterType is set to INCLUDE, any topic names on the source cluster that match this filter will be created as mirror topics.
  • If filterType is set to EXCLUDE , any matching topic names will not be created as mirror topics. In other words, prevents auto mirror topic creation for the specified topic names. EXCLUDE filters override any overlapping INCLUDE filters. For example, if you have an INCLUDE filter for the prefix “foo” but have an EXCLUDE filter for the prefix “foo.bar,” then a topic on the source cluster named “foo.fighters” would be mirrored automatically, but a topic named “foo.bar.fighters” would not be mirrored automatically.

There is no limit to the number of filters you can add on a cluster link.

Example filters

Mirror all topics

This filter will create mirror topics for all current and future source cluster topics:

{ "topicFilters": [ {"name": "*",  "patternType": "LITERAL",  "filterType": "INCLUDE"} ] }

Mirror all topics that begin with a given string

This filter will mirror all topics that begin with “foo”:

{ "topicFilters": [ {"name": "foo",  "patternType": "PREFIXED",  "filterType": "INCLUDE"} ] }

Mirror all topics except those that begin with “secret”

This filter will mirror all topics except those that begin with “secret”:

{ "topicFilters": [ {"name": "*",  "patternType": "LITERAL",  "filterType": "INCLUDE"},   \
{"name": "secret",  "patternType": "PREFIXED",  "filterType": "EXCLUDE"} ] }

Mirror named topics if they exist on the source cluster

This filter will mirror three topics, “liz”, “jack”, and “kenneth”, if they exist on the source cluster:

{ "topicFilters": [ {"name": "liz",  "patternType": "LITERAL",  "filterType": "INCLUDE"},   \
{"name": "jack",  "patternType": "LITERAL",  "filterType": "INCLUDE"},    \
{"name": "kenneth",  "patternType": "LITERAL",  "filterType": "INCLUDE"}  ] }

How a mirror topic is auto-created

For a given topic on a cluster link’s source cluster (the “source topic”), a new mirror topic will be auto-created by the cluster link if all of these conditions are true:

  • auto.create.mirror.topics.enable is set to true
  • auto.create.mirror.topics.filters has filters which INCLUDE the source topic name
  • The cluster link’s security credential is authorized (via source cluster ACLs) to read the source topic.
  • There is no topic by that name already on the destination cluster.
  • If prefixing is enabled on the cluster link, then the source topic cannot be a mirror topic. You cannot “chain” mirror topics when both auto.create.mirror.topics.enable and prefixing are enabled.

If any of the above conditions are false, then a mirror topic will not be auto-created for the given source topic.

Override topic configurations when using auto-create mirror topics

To override a topic configuration when using auto-create mirror topics, you have two options:

  • Change the topic configuration after the mirror topic is automatically created.
  • Use the CLI or API to manually create the mirror topic, and override the configuration. Even if a topic matches the auto-create mirror topic filters, it can still be manually created as a mirror topic before the cluster link creates it automatically. Auto-create mirror topics runs once every 5 minutes, so the mirror topic can be manually created soon after the cluster link is created or soon after the source topic is created.

Delete topics that were auto-created

You cannot delete a mirror topic that matches the auto-create mirror topics filters. If you deleted such a topic, and there was a topic of the same name on the source cluster, the mirror topic would be automatically re-created and sync all its history (if mirror.start.offset.spec is set to the default). This would render the delete operation futile.

To delete a mirror topic while auto-create mirror topics is enabled, you have three options: delete the source topic first, exclude the topic’s name from the auto-create mirror topics filters, or disable auto-create mirror topics.

  • Option 1. Delete the source topic first - Given a source topic named cool-topic, if you delete the source topic and then want to subsequently delete the associated mirror topic (cool-topic on the destination), wait until the mirror topic becomes a FAILED mirror topic (which may take up to five minutes), after which point you can delete it. You can also call failover or promote on the mirror topic to transition it to the STOPPED state. Both FAILED and STOPPED mirror topics can be deleted.
  • Option 2. Exclude the topic name from the auto-create mirror topics filters - This strategy will prevent the mirror topic from overlapping with the auto-create filters. An easy way to remove a given topic from the filters is to add an EXCLUDE filter for that topic name. You can add cool-topic to the EXCLUDE filters, even if no such source topic exists. After editing the auto-create mirror topic filters, you can delete the mirror topic.
  • Option 3: Disable auto-create mirror topics on the cluster link - After the setting has been disabled, the mirror topic can be deleted. If needed, auto-create mirror topics can be immediately re-enabled on the cluster link. To learn more, see Disable auto-create mirror topics and Mirror topic deletion.

Disable auto-create mirror topics

To disable auto-create mirror topics entirely, set this property on the cluster link:

auto.create.mirror.topics.enable=false

Here’s an example of how to set that property with the CLI:

echo "auto.create.mirror.topics.enable=false" > tmp.txt
confluent kafka link configuration update <link-name> --config-file tmp.txt
rm tmp.txt

Prefix Mirror Topics and Consumer Group Names

Note

Prefixing is not available on Confluent Platform version 7.1 or earlier. It is available on Confluent Cloud and in Confluent Platform starting with release 7.2.0.

Cluster links can be configured with a prefix (cluster.link.prefix) that is applied to the names of the mirror topics and, optionally, the names of the consumer groups that are managed by the cluster link at the destination cluster. This enables topics and consumer groups from different source clusters that have the same name to be synced to the destination without name clashes. It also enables all mirror topics from a cluster link to be categorized and managed under one prefix on the destination.

For example, consider two links, link-1 and link-2. link-1 is linking data from cluster s1 to destination and link-2 is linking data from s2 to destination, and furthermore s1 and s2 both contain a topic “clicks”. Without prefixing, it would be impossible for both links to sync data for their own “clicks” topic as they would have the same name on the destination cluster. With prefixing, each link can have its own unique prefix that is applied to the topic name as its mirrored. link-1 could have prefix usa_ and link-2 could have prefix eu_. Finally, at the destination cluster there would be two topics, usa_clicks and eu_clicks.

If the link is configured with a prefix, when a mirror topic is created (for example, with confluent kafka mirror create) then the mirror topic name must begin with the prefix (otherwise, the operation will fail). If auto-create mirror topics is used, the topics created on the destination will automatically be named with the prefix.

The prefix can optionally be applied to the consumer groups that are created on the destination cluster because of consumer group offset syncing. When offsets are synced, consumer groups are created on the destination; with this feature it’s possible to prefix the consumer group name on the destination. This enables consumer group offsets to be synced even when two (or more) consumer groups from two (or more) different source clusters have the same name. For example, if link-1 had consumer group g1 and link-2 had consumer group g1, then prefixing would result in two consumer groups at the destination: usa_g1 and eu_g1. By default, consumer group names are not prefixed with the prefix; consumer.group.prefix.enable must be set to true in the cluster link config to enable this.

Here’s an example configuration file for Confluent Enterprise, containing just elements relevant to prefixing:

bootstrap.servers=localhost:9092
cluster.link.prefix=usa_
consumer.offset.sync.enable=true
auto.create.mirror.topics.enable=true
auto.create.mirror.topics.filters={"topicFilters":[{"name": "*","patternType": "LITERAL","filterType": "INCLUDE"}]}
consumer.group.prefix.enable=false
acl.sync.enable=false

Here, a prefix of usa_ has been configured and consumer.group.prefix.enable has been set to false (which is the default, but shown here for context). All mirror topic names on the destination will start with the prefix; consumer group names will remain the same as they are on the source. Also note, acl.sync.enable is set to false which is required because auto.create.mirror.topics.enable is set to true and prefixing is enabled; see limitations below.

On Confluent Cloud, these configurations are specified on the command line or the Confluent Cloud console.

Limitations on prefixing

  • The prefix cannot be changed after the cluster link is created.
  • ACL syncing and prefixing cannot be enabled at the same time. Note, ACLs can always be synced on a separate link; just create a new link and configure it to sync ACLs.
  • Consumer group prefixing cannot be enabled for bidirectional links. Setting consumer.group.prefix.enable to true on a bidirectional cluster link will result in an “invalid configuration” error stating that the cluster link cannot be validated due to this limitation.
  • Prefixing cannot be combined with chaining and auto-create mirror topics at the same time. When auto-mirroring and prefixing is configured, a link cannot mirror a topic that is itself a mirror topic at the source cluster. For example, consider the same links above, link-1 and link-2. If a new link-3 was created, auto-mirroring would not be able to mirror data from usa_clicks or eu_clicks or any mirror topic on the destination (even if it didn’t have a prefix) because they are mirror topics. This is done as a safeguard to prevent auto-mirroring from creating an infinite number of topics due to cyclical cluster link connections.

Tip

Prefixed chained mirror topics can still be created by hand, for example via confluent kafka mirror create.

Aggregate multiple source cluster topics into a single topic

Cluster Linking can be used for aggregating data from multiple identical source clusters into one destination cluster. For example, each source cluster may be running in a different region, collecting local data, and Cluster Linking can stream data from each local cluster to a central, aggregate cluster.

Note that every topic, on every source cluster, that you wish to aggregate will need its own uniquely named mirror topic on the aggregate cluster. An easy way to accomplish this is by setting a unique prefix on each cluster link.

If a consumer group needs to read the data from all source clusters (for example, from all regions), it can easily consume multiple mirror topics at the same time by consuming from a regular expression (“regex”) topic pattern that matches the all mirror topic names you want to source from (rather than consuming from a single topic name). Most open source Kafka clients support consuming from a regex topic pattern.

../../_images/cluster-link-aggregate-topics.png

Tip

If you can’t consume from a regex pattern, ksqlDB INSERT queries can be used to merge the mirror topics into a single aggregated topic for each data type.

Topics not mirrored

By design, the following topics are not mirrored (synced):

  • confluent-audit-log-events
  • Internal or “system” topics (for example, any topic prefixed with _confluent or __confluent)

Mirroring lag

The mirror process is asynchronous in operation. Therefore, there will often be some mirroring lag between the source topic and the mirror topic. The most recent messages on the source topic may not yet have been mirrored to the mirror topic, so the mirror topic may often be slightly behind the source topic.

The same is true for syncing the topic configuration, the consumer group offsets, and the ACLs. All of these processes are asynchronous, so the changes will happen first on the source topic, and then on the mirror topic shortly after.

Sync consumer group offsets

You can configure your cluster link to sync consumer group offsets from its source topic to the destination topic.

Enabling consumer group offset sync and specifying filters

To set this up, you configure the following the following properties:

  • consumer.offset.sync.enable - Set this to true to sync consumer group offsets. (The default is false.)
  • consumer.offset.group.filters - Pass in a JSON file with a pattern that is matched against consumer group names to identify which groups to mirror.

If these two properties are set, the cluster link syncs the consumer group offsets of any matching consumer groups for all mirror topics that the link mirrors.

Note

Consumer group filters should not include groups that are being used on the destination. This will help ensure that the system does not override offsets committed by other consumers on the destination, or overwrite the consumer offsets while consumer groups are consuming from the mirror topic. If you are unsure about which consumer groups are being used on the destination, consumer offset sync on the cluster link until you verify this.

Why consumer offsets may be clamped in the event of failover, promote, or consumer group migration

When either failover or promote is called on a mirror topic or when a consumer group moves to the destination cluster and consumes its first message from a mirror topic, if consumer offset sync is enabled on the cluster link, then the consumer offsets for that topic may be “clamped”. That is, the consumer offsets for the topic that the cluster link synced will not be allowed to be larger than the last offsets on the mirror topic (the “log end offset”). If any of these consumer offsets are larger / further than the last offsets (Log End Offset), then those consumer offsets will be reset to the Log End Offset. (The use of failover and promote are covered in the next section on Convert a mirror topic to a normal topic.)

To illustrate with an example: suppose a source topic with one partition had messages up to offset 100 and is being mirrored over a cluster link. However, there was mirroring lag on the cluster link, and only the messages up to offset 90 were mirrored when a disaster hit the source topic. At this point, you call failover on the mirror topic. Consumer group A was at offset 80 on the source cluster, so it will remain at offset 80 on the destination cluster, since that offset was mirrored to the mirror topic. But consumer group B was at offset 95, which was not mirrored to the mirror topic. If consumer group B started consuming at offset 95 on the mirror topic, then it would miss any messages at offsets 90-94 that were produced to the topic. To avoid that problem, the cluster link “clamps” consumer group B’s offsets down to 90, which is the highest offset on the mirror topic.

Where to learn more

To learn more, see the example in (Optional) Step 3: Move consumer groups from the old cluster to the new cluster under Standard migration with Cluster Linking and the command descriptions under Configuring cluster link behavior in Configuring cluster link behavior.

Examples of the JSON file describing these filters are shown in the steps under Create a cluster link in the topic data sharing tutorial, in Create the cluster link in the disaster recovery and failover tutorial, and in the Confluent Platform documentation under Migrating Consumer Groups from Source to Destination Cluster).

Reverse a source and mirror topic

The source topic -> mirror topic relationship can be reversed using the reverse-and-start or reverse-and-pause commands. These cause the source topic to become the mirror topic, and the mirror topic to become the source topic.

The following diagram shows how a reverse-and-pause command works. The reverse-and-start command behaves in a similar fashion, except that instead of the extra manual step to resume the mirror topic to become an active mirror topic, the command automatically converts the topic to an active mirror topic.

../../_images/cluster-link-reverse-mirror.png

How it works

The reverse-and-start command leaves the new mirror topic in an active mirroring state, whereas reverse-and-pause leaves the new mirror topic in a paused state until the resume command is called.

These commands are available in the Confluent CLI at confluent kafka mirror, the REST API at reverse, and kafka-mirrors in Confluent Platform 7.7+.

Cluster Linking ensures that both topics have the same data and metadata at the point of change, so no data are left behind. Once the command is called, the source topic will not accept new writes, which allows the mirror topic to catch up and perform the reversal. After the reversal is complete, data written to the (new) source topic will then flow to the (new) mirror topic. This provides a fast and efficient failback for a planned failover mechanism, allowing you to quickly failover to the mirror site, produce new data, and then quickly failback to the original site.

Requirements for using “reverse” commands

  • The cluster link must be in Bidirectional mode.
  • On Confluent Cloud, both clusters must be Confluent Cloud Dedicated clusters.
  • On Confluent Platform, both clusters must be Confluent Platform version 7.7 or later.
  • Both clusters must be healthy and able to communicate over the network.
  • You must have the CLUSTER:ALTER ACL or the Admin role on the cluster where this command is run.

Process flow for “reverse” commands

Calling the reverse commands follows this chain of events:

  1. Call reverse-and-start or reverse-and-pause on the mirror topic.

    • Make sure this command is used against the cluster that hosts the mirror topic; for example, the disaster recovery (DR) cluster
    • Multiple topics can be reversed at once using the REST API
    • You must have the CLUSTER:ALTER ACL or the Admin role on the cluster where this command is run.
  2. The mirror topic enters PENDING_SYNCHRONIZE state, and the source topic enters PENDING_MIRROR state.

    During this time:

    • The source topic will not accept any new writes (produce requests).
    • The mirror topic will fetch all data from the source topic until it is up to date.
    • Tip: The larger the mirroring lag on the mirror topic, the longer this step will take. To minimize the amount of time when both topics are in a read-only state, call the command at a time when mirroring lag is at zero, or very low.
  3. Once the data has been synchronized, the (old) mirror topic enters PENDING_STOPPED state.

    • During this time, the old mirror topic fetches any last metadata (such as, consumer offsets).
    • Monitor for errors that could cause this step to hang using the state transition error API or Metrics.
  4. The (new) source topic enters the STOPPED mirror state, and accepts writes as a regular topic.

    • The (new) mirror topic enters the ACTIVE or PAUSED state, depending on which command was called. The mirroring relationship is reversed.

Limitations

  • The “reverse” commands currently do not work with hybrid links. In other words, this will not work on a bidirectional link where one cluster is on-premises and the other cluster is in Confluent Cloud.
  • The “reverse” commands cannot be used in Confluent Platform when unclean leader election is enabled.
  • Make sure you have monitoring in place to check all of the different states the topics will be going through (as described in Process flow for “reverse” commands). If at any point in time there are any issues in the process, you can always use the failover command to get a mirror topic to a writable state.
  • You should run this command with only one topic at a time for transactional producers, and you must monitor each topic to the end state before running the command for the next topic. If you run the command in batch, you must make sure that all topics are transitioned to a writable state before restarting the application for production. Otherwise, if the applications are restarted before the topics are transitioned into the end state, this can result in the new records not being persisted in Kafka because it starts writing to an immutable topic.

Convert a mirror topic to a normal topic

If you want to convert a mirror topic into a normal topic that you can produce into, you can call the failover or the promote command on the mirror topic, using the commands confluent kafka mirror promote and confluent kafka mirror failover, respectively.

Specifically, you call confluent kafka mirror promote <topic-name> --link <link-name> or confluent kafka mirror failover <topic-name> --link <link-name>.

Both the failover and promote commands occur on the destination cluster (the mirror topic’s cluster) and require you to pass in the cluster link’s name.

promote
The promote option is often used for migrations. It checks that there is no mirroring lag, config sync lag, or consumer offset lag between the source topic and the mirror topic. Then, it converts the mirror topic into a full topic, with the assurance that this topic was exactly the same as its source topic. The destination cluster’s brokers must be able to reach the source cluster’s brokers in order to make this check, so your source cluster must be online.
failover
The failover option is often used when a disaster has hit the source cluster (for example, a cloud region outage) and you want to shift operations from the source topic to the mirror topic. This command will succeed regardless of the mirroring lag or the source cluster’s reachability.
truncate-and-restore
If you want to restore mirroring after a promote or a failover, you can use the truncate-and-restore command. After failing over or promoting a mirror topic, you can run truncate-and-restore on the original primary topic that will make it a mirror fetching from the newly-stopped mirror topic. This command will also truncate and delete any divergent records that were produced to the original primary cluster after the point of failover. This means that there could be some loss of data if your clients are not set up to reprocess data. (This command is only available on “bidirectional” links.)
reverse-and-start
After running truncate-and-restore, you can restore the original primary and secondary regions by running the reverse-and-start command on the new mirror topic. (This command is only available on “bidirectional” links.)

Important

  • The truncate-and-restore and reverse-and-start commands are only available for “bidirectional” links; for non-bidirectional links, promote and failover are irreversible commands.
  • The truncate-and-restore command is currently available only on Confluent Cloud and does not work with hybrid links at this time.
  • You can run mirror describe (confluent kafka mirror describe <mirror-topic-name> --link <link>) on a promoted or failed over mirror topic, if you do not delete the cluster link. If you delete the cluster link, you will lose the history and, therefore, mirror describe will not find data on promoted or failed over topics.
  • There is no way to change a mirror topic to use a different cluster link or make changes to the link itself, other than to recreate the mirror topic on a different link.
  • You cannot delete a cluster link that still has mirror topics on it (the delete operation will fail).
  • If you are using Confluent for Kubernetes (CFK), and you delete your cluster link resource, any mirror topics still attached to that cluster link will be forcibly converted to regular topics by use of the failover API. To learn more, see Modify a mirror topic in Cluster Linking using Confluent for Kubernetes.

Example of topic migration

../../_images/cluster-link-migrate.png

Example of failing over a topic

../../_images/cluster-link-failover1.png

Example of truncate and restore

../../_images/cluster-link-truncate-and-restore.png

Example of reverse and start

../../_images/cluster-link-reverse-and-start.png

Mirror topic states and statuses

You can use the command confluent kafka mirror describe to get information about a mirror topic, including related states and statuses.

For example:

confluent kafka mirror describe alkesh-cp-topic --link from-on-prem-link

      Link Name     | Mirror Topic Name | Source Topic Name | Mirror Status | Status Time (ms) | Partition | Partition Mirror Lag | Last Source Fetch Offset
--------------------+-------------------+-------------------+---------------+------------------+-----------+----------------------+---------------------------
  from-on-prem-link | alkesh-cp-topic   | alkesh-cp-topic   | ACTIVE        |    1730740163703 |         0 |                    0 |                       10

When you describe a mirror topic, it will return one of these states:

ACTIVE
The mirror is running normally, and messages are being mirrored from the source topic to the destination topic.
PAUSED
  • A user has paused mirroring for this mirror topic.

  • To reach this state, a user must either pause this specific topic, or pause its cluster link.

    Caution

    Confluent Cloud cluster links cannot be paused. On Confluent Cloud, users can only pause the individual mirror topics, as described in confluent kafka mirror pause.

PENDING_SYNCHRONIZE
  • This topic is in the process of becoming a regular topic that will be mirrored to the remote cluster. (Previous to this, the topic was a mirror topic.)
  • This topic is currently READ ONLY. (It will not accept produce, but it can be consumed from).
  • The PENDING_SYNCHRONIZE state occurs when a reverse command is called on a topic.
  • Allowed operations to a topic in this state are as follows:
    • pause: to pause the reversal process
    • failover: to permanently abort the reversal process and convert this to a writable, non-mirror topic.
  • The topic will automatically transition to the STOPPED state when ready.
PENDING_MIRROR
  • This topic is in the process of becoming a mirror topic (it was formerly a writable topic).
  • It will not accept produce, but it can be consumed from (it is READ ONLY).
  • The PENDING_MIRROR state occurs when a reverse command is called on the remote mirror topic for this source topic.
  • Allowed operations to a topic in this state are as follows:
    • failover: to permanently abort the reversal process and convert this to a writable, non-mirror topic.
  • When its conversion to a mirror topic has completed, the topic will automatically transition to the ACTIVE or PAUSED state, depending on which reverse command was used.
PENDING_SETUP_FOR_RESTORE
  • This topic is in the process of becoming a mirror topic (it was formerly a writable topic) that will be mirroring from the remote cluster.
  • In this state, the mirror topic is reconciling the specific offsets on the topic to truncate. To ensure the mirror is restored properly, the correct offsets must be determined on the topic so that any divergent records can be truncated on the old primary cluster.
  • It will not accept produce, but it can be consumed from (it is READ ONLY).
  • The PENDING_SETUP_FOR_RESTORE state occurs when a restore command is called on this topic.
  • The topic will automatically transition to the ACTIVE state when ready.
  • Allowed operations to a topic in this state are as follows:
    • failover: to permanently abort the restoration process and convert this to a writable, non-mirror topic.
PENDING_RESTORE_MIRROR
  • This topic is in the process of becoming a mirror topic (it was formerly a writable topic) that will be mirroring from the remote cluster.
  • In this state, the mirror topic truncates to the reconciled offsets.
  • It will not accept produce, but it can be consumed from (it is READ ONLY).
  • The PENDING_SETUP_FOR_RESTORE state occurs when a restore command is called on this topic.
  • When its conversion to a mirror topic has completed, the topic will automatically transition to the ACTIVE state.
  • Allowed operations to a topic in this state are as follows:
    • failover: to permanently abort the restoration process and convert this to a writable, non-mirror topic.
PENDING_STOPPED
  • A user has stopped this mirror topic with the promote command, and this topic will soon be in the STOPPED state.
  • To force the mirror topic to immediately go from the PENDING_STOPPED state to the STOPPED state, call the failover command on it. Doing this cancels any synchronization that was happening between the source cluster and the destination cluster, and eliminates any guarantees that the promote command gives.
STOPPED
  • Mirroring has permanently stopped for this topic. It will no longer receive messages from its source topic. The topic is now writable and can receive messages produced directly to it.
  • To get into this state, a user must call either promote or failover on this mirror topic.
  • Even though a STOPPED topic is no longer a mirror topic, it will still be listed in output for the commands confluent kafka mirror list and confluent kafka mirror describe <destination-topic-name> --link <link> for as long as the cluster link exists. This is useful because the topic will return the last offset it fetched from its source topic (Last Source Fetch Offset) for each partition, and the time at which it was stopped (Status Time).
SOURCE_UNAVAILABLE
  • The mirror topic is unable to reach the source topic, and is not mirroring messages from the source topic. This could happen if the source cluster is experiencing an outage or if the network between the destination cluster and the source cluster is unstable.
  • Mirroring will resume once the issue is resolved and the destination cluster can reach the source cluster.

Note

Using a Confluent Platform 7.0.x source cluster with a source-initiated link to a KRaft destination cluster will generate a SOURCE_UNAVAILABLE error. Cluster Linking between a source cluster running Confluent Platform 7.0.x or earlier (non-KRaft) and a destination cluster running in KRaft mode is not supported. To solve for this, upgrade the source cluster to Confluent Platform 7.1.0 or later.

LINK_FAILED
  • An error has broken the mirror topic’s cluster link, and no data is being mirrored. A user needs to manually re-configure the link.
FAILED
  • The mirror topic has permanently failed. It will no longer mirror data. This can happen if the cluster link ACLs are removed from the source cluster, or if the source topic is deleted. In both cases, the failed status takes effect only after cluster.link.retry.timeout.ms is reached (by default, the system retries the link for 5 minutes).
  • You can stop this mirror with the failover command, and it will become a regular topic.
  • If you want to restore mirroring for this topic, you must delete the legacy mirror topic and create a new mirror topic with the same name.

View mirror topic state transition errors

You can use the following commands to view mirror topic state transition errors. For example, when a mirror topic is promoted, it transitions from the PENDING_STOPPED state to STOPPED state. During that process, various actions are performed to implement the transition and errors can occur during that implementation. The following APIs allow you to view these errors and unblock the mirror topic state transitions. For example, if you see an authentication issue, you can reconfigure the link’s credentials to allow the mirror topic to be fully promoted.

For a full list of possible task states and error codes, see Troubleshooting Cluster Linking on Confluent Cloud.

Important

These CLI commands are not yet available on Confluent Platform.

To view errors associated with a state transition:

confluent kafka mirror state-transition-error list <topic-name>  --link <link-name>

Examples

confluent kafka mirror state-transition-error list topic-1 --link link-1

  Mirror State Transition Error  | Mirror State Transition Error
               Code              |            Message
---------------------------------+---------------------------------
  AUTHENTICATION_ERROR           | Failed to describe topic
                                 | configs due to authentication
                                 | issues.
---------------------------------+---------------------------------

Mirror topic deletion

You may safely delete a mirror topic. Deleting a mirror topic permanently stops data mirroring to that topic. If you create a new normal topic of the same name on the same cluster, data will not be mirrored to it.

To delete a mirror topic, use the same command you would use to delete a normal topic:

confluent kafka topic delete <topic-name>

To learn more, see Delete topics that were auto-created.

Important

  • When deleting a cluster link, first check that all mirror topics are in the STOPPED state. If any are in the PENDING_STOPPED state, deleting a cluster link can cause irrecoverable errors on those mirror topics due to a temporary limitation.
  • You cannot delete a cluster link that is attached to any mirror topics. You must first delete, failover, or promote all of the mirror topics, and then you can delete the cluster link.

Source topic deletion

Caution

Do not delete a source topic that is being mirrored by a mirror topic. Doing so can lead to unpredictable truncation and data loss on the mirror topic. You should always stop mirroring to all associated mirror topics before deleting a source topic.

While it may be possible to delete a source topic that is being mirrored by a mirror topic and a cluster link, doing so is not recommended. In particular, unpredictable behavior can occur if a source topic is deleted, and a topic by the same name is then created within a few minutes time. This scenario can cause permanent data loss on any mirror topics that are still mirroring from that source topic, and can also cause performance issues on the source cluster or destination cluster.

Before deleting a source topic, you should stop any mirroring to associated mirror topics. You can stop mirroring on a mirror topic in one of these ways:

  • Delete the mirror topic.
  • Call promote or failover so the mirror topic enters the STOPPED state.
  • Revoke the security permissions for the cluster link to read the source topic. You can do this in one of three ways: (1) delete the cluster link’s ALLOW ACL for the source topic, (2) create a DENY ACL for the source topic, or (3) delete the cluster link’s API key.

For further discussion about Kafka limitations with topic deletion and how topic IDs will help, see KIP-516.

How schemas work with mirror topics

Cluster Linking preserves the schema ID stored in each message. Therefore, to consume from a mirror topic that is using schemas, the consumer clients must use a Schema Registry context with the same schema IDs as on the Schema Registry context used by the producers to the source topic. Consequently, consuming from a mirror topic that uses schemas should be done either by:

  • (Option 1) using the same Schema Registry as the producers used
  • (Option 2) using a Schema Registry context that was synced through Schema Linking from the Schema Registry that the producers used

Caution

When using Schema Linking: To use a mirror topic that has a schema with Confluent Cloud Connect, ksqlDB, broker-side schema ID validation, or the topic viewer, make sure that Schema Linking puts the schema in the default context of the Confluent Cloud Schema Registry. These fully-managed Confluent Cloud features require schemas to be in the default context of the Confluent Cloud Schema Registry in their Environment.

../../_images/mirror-topics-and-schemas.png

Mirror Topics and Schemas

Advanced mirror topic architectures

Chaining

A mirror topic can be a source topic itself. A mirror topic can be mirrored by different cluster links, allowing you to “chain” cluster links and mirror topics together.

For example, Topic A (source topic) on Cluster 1 —cluster link—> Topic A (mirror topic and source topic) on Cluster 2 —cluster link—> Topic A (mirror topic) on Cluster 3

Tip

You can safely create these chained topics and cluster links without creating a circular dependency between mirror topics and cluster links. You can create mirror topics without the fear of creating an infinite loop.

../../_images/cluster-link-chain.png

Chaining Example

Fanning out

A source topic can be mirrored to multiple mirror topics. These mirror topics must exist on multiple different clusters.

For example, Topic A on Cluster 1 —cluster link—> Topic A on Cluster 4, and Topic A on Cluster 1 —cluster link—> Topic A on Cluster 5

Tip

If you plan to use failover or promote on a cluster link (for example, for Disaster Recovery or Migration), then chained or fanned-out mirror topics will not automatically retain their shape. For example, if you fan out A –> B and A –> C, if A has an outage and you call failover on B, there is no way to automatically mirror B –> C. You will need to reconstruct the appropriate mirroring relationship for your use case using brand new topics.

../../_images/cluster-link-fan-out.png

Fan-Out Example

Configurations

The following sections provide a quick reference of which Cluster Linking configurations are synced from the source to the mirror topic, overrides, and concepts related to syncing.

Synced mirror topic configurations for Confluent Cloud

These configurations are always synced from the source topic to the mirror topic. Mirror topics will always have the same value as their source topic, in order to ensure the properties of mirror topics are met.

  • number of partitions
  • max.message.bytes
  • cleanup.policy
  • message.timestamp.type
  • message.timestamp.difference.max.ms

By default, the following configurations also are synced from the source topic to the mirror topic unless they are explicitly removed from topic.config.sync.include, as described in the following section.

  • retention.bytes
  • retention.ms
  • delete.retention.ms
  • min.compaction.lag.ms
  • max.compaction.lag.ms

Setting retention configurations to always sync keeps the source and destination data identical. With this default configuration, the starting offset is also synced from source to mirror topics. By maintaining consistent log start offsets, Cluster Linking guarantees that records deleted from the source cluster are also deleted from the destination cluster. This may be a regulatory requirement for some customers.

Override default syncing to specify independent mirror topic behavior

Some use cases require independent retention for source and destination topics. For example, when mirroring data from small edge clusters to large centralized clusters, low-footprint edge clusters may use very small retention, but rely on the data being available for a long time on the destination cluster.

To satisfy these cases, you can override the defaults by explicitly setting the following property to specify only those topic configs you want synced from source to destination:

  • topic.config.sync.include - The list of topic configs to sync from the source topic.

For example, the topic configurations could be set to the following (which does not include the retention properties):

topic.config.sync.include=max.message.bytes,cleanup.policy,message.timestamp.type,message.timestamp.difference.max.ms,min.compaction.lag.ms,max.compaction.lag.ms

Create new mirror topics and specify an override using --config as described in Mirror topic creation and confluent kafka mirror create.

With these overrides in place, mirror topics will have independent retention periods and starting offsets instead of syncing with their source topics.

Important

Keep in mind, configuration overrides are specified at the cluster link level, so will apply to all mirror topics on the cluster. If independent retention is specified (by omission in topic.config.sync.include), you must either specify the retention value or use the Kafka defaults.

Mirror topic configurations not synced

Any configuration that is not in the list above will not be synced to a mirror topic in Confluent Cloud. Therefore, the mirror topic’s configuration could be different from the source topic’s configuration. If you don’t override the mirror topic’s configuration, then it will inherit its cluster’s default.

A few important examples of configurations that are not synced to mirror topics in Confluent Cloud:

  • min.insync.replicas
  • confluent.placement.constraints
  • compression.type
  • replication.factor (No replication factors are synced to mirror topics. The replication factor defaults to 3 for all topics, and this is not configurable.)

Hybrid cloud configuration syncs

Confluent Platform and Confluent Cloud have different policies for which mirror topic configurations are synced. If you cluster link between Confluent Platform and Confluent Cloud, then the destination cluster’s policy is enforced.

For example, if you cluster link from a Confluent Platform source cluster to a Confluent Cloud destination cluster, then the value of compression.type will not be synced. But if you cluster link from a Confluent Cloud source cluster to a Confluent Platform destination cluster, then compression.type will be synced.