Manage Security for Cluster Linking on Confluent Platform

All security configurations used to connect to the source cluster can be configured on the cluster link when the link is created. Each link is associated with exactly one link credential that will be used for authentication of connections to the source cluster using that link. Different cluster links on the same cluster may use different security credentials. The link credential must be granted appropriate permissions on the source cluster.

Authentication

The following example shows how to configure SASL_SSL with GSSAPI as the SASL mechanism for the cluster link to talk to the source cluster. You can set these configurations using a config-file, as described in the section on how to set properties on a cluster link.

security.protocol=SASL_SSL
ssl.truststore.location=/path/to/truststore.p12
ssl.truststore.password=truststore-password
ssl.truststore.type=PKCS12
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
    useKeyTab=true \
    storeKey=true \
    keyTab="/path/to/link.keytab" \
    principal="clusterlink1@EXAMPLE.COM";

Cluster Linking configurations should include client-side TLS/SSL and SASL/GSSAPI configuration options for connections to the source cluster in this scenario.

If you reference a keystore/truststore directly (for example, keystore.jks), the same files must be available in the same location on each of the brokers

For details on creating TLS/SSL key and trust stores, see Encrypt and Authenticate with TLS. For details on SASL/GSSAPI, see Configuring GSSAPI. All brokers at the destination cluster must be configured with password.encoder.secret for encrypting sensitive link configurations when security is enabled. If the link is source-initiated (for example, from CP 7.1+ to Confluent Cloud), then password.encoder.secret must be set on the source cluster, also. This parameter must be set as a new password. To learn more, see Update password configurations dynamically.

To configure cluster links to use other SASL mechanisms, include client-side security configurations for that mechanism. See Authenticate with SASL using JAAS for other supported mechanisms. To use mutual TLS authentication as the security protocol, a key store should also be configured for the link. See Encrypt and Authenticate with TLS for details.

Note

The cluster links use source credentials configured on the link to communicate with the source cluster. These credentials must be valid in order for the link to function.

Mutual TLS (mTLS)

Cluster Linking can use mutual TLS (mTLS) for some, but not all, data exchanges:

  • Confluent Cloud to Confluent Cloud does not use mTLS; it uses TLS and SASL as described in Cluster Linking on Confluent Cloud.
  • Data coming into Confluent Cloud from OSS Kafka can be configured to use mTLS
  • Data coming into Confluent Cloud from Confluent Platform can be configured to use either mTLS or source-initiated cluster links with TLS+SASL. mTLS can be used as the Confluent Platform side of a source-initiated cluster link.

Follow these guidelines when configuring mTLS for Confluent Platform to Confluent Platform cluster links:

  • To learn more about PEM files and possible configurations, see the Apache Kafka KIP that introduced PEM file support.

  • If you reference a keystore/truststore directly (for example, keystore.jks), the same files must be available in the same location on each of the brokers

  • Only PKCS #8 keys are supported for the private key specified in ssl.keystore.key. If the key is encrypted, the key password must be specified using ssl.key.password.

  • If the files cannot be copied to the brokers directly, be sure to list them in PEM format.

    • If using kafka-cluster-links , add \n to indicate a new lines in the encrypted certificates. The example below is modified; it doesn’t include full keys, but should be enough to show the use of \n:

      security.protocol=SSL
      ssl.keystore.type=PEM
      ssl.truststore.type=PEM
      ssl.key.password=clientpass
      ssl.endpoint.identification.algorithm=
      ssl.keystore.key=-----BEGIN ENCRYPTED PRIVATE KEY-----\nMIIE6TAbBgkqhkiG9w0BBQMwDgQI4d+aGvbJsIcCAggABIIEyB+qVnCcV9BbBz97\njmG1Xm6p2HIbO4tgMTld/Kmfx/4RHECdnhjiP3plH4yxZnaHBbiRHlno1d2xL93n\n4pqQkfhfJhBIqzoO3A==\n-----END ENCRYPTED PRIVATE KEY-----
      ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE-----\nMIIETjCCAzagAwIBAgIUX64Yeyvwae61we2rC4gniQUcrdIwDQYJKoZIhvcNAQEL\nBQAwZTEYMBYGA1UEAwwPVGVzdENvbmZsdWVudENBMQwwCgYDVQQLDANFQUExEDAO\nrPhy0+XGDZbC4PWYi3FogFTKKjKzjtO3ZP5nXt6zvF9/nCn8RpljKJH4brIIlhPM\n3Us=\n-----END CERTIFICATE-----
      ssl.truststore.certificates=-----BEGIN CERTIFICATE-----\nMIIDRjCCAi4CCQCW7jXMNbE1XzANBgkqhkiG9w0BAQsFADBlMRgwFgYDVQQDDA9U\nZXN0Q29uZmx1ZW50Q0ExDDAKBgNVBAsMA0VBQTEQMA4GA1UECgwHRnJlZW1hbjEP\n4Q/DVCHHUvJwHxd/5Bc08s56FYHFetoB1d4=\n-----END CERTIFICATE----
      
    • If using confluent kafka link, the command doesn’t interpret \n in the same way. Instead, replace literal \n with a single space.

      ssl.endpoint.identification.algorithm=
      security.protocol=SSL
      ssl.keystore.type=PEM
      ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE-----\
      MIIDeTCCAmGgAwIBAgIJAKU/BVTP/yyUMA0GCSqGSIb3DQEBCwUAMHMxCzAJBgNV\
      BAYTAlVTMQswCQYDVQQIDAJNVDEQMA4GA1UEBwwHQm96ZW1hbjEPMA0GA1UECgwG\
      RmlndXJlMQ8wDQYDVQQLDAZGaWd1cmUxIzAhBgNVBAMMGmthZmthLXJvb3QudGVz\
      ...
      k1PdbjufUewo7KQf8nde1IefAbSARG6Fu4oY2g4=\
      -----END CERTIFICATE-----
      ssl.keystore.key=-----BEGIN ENCRYPTED PRIVATE KEY-----\
      MIIE5DAcBgoqhkiG9w0BDAEBMA4ECIYZ0EPRimP2AgIIAASCBMJLvS+Mtm9HzU4O\
      fu3EuTu7LyjH4KEyzSYsuKnhYMDbFxHD1V/dSIr3N8ZqDjz5Xr3TvsN3pVwa5BFh\
      Tv8NhhzEWB6jQtf7xo0cGPlL2VVO95D2aAvLBTQegWxqOXrhGIzqffyw/59uyJi9\
      ...
      WsrgdacPOGc4pC+bFXUOySXUBrvI47rPLn3tHHWnGfKEOKA0zMRPHjMUWmooKhKl\
      VKz1+zPeZ1s=\
      -----END ENCRYPTED PRIVATE KEY-----
      ssl.key.password=<REDACTED>
      ssl.truststore.type=PEM
      ssl.truststore.certificates=-----BEGIN CERTIFICATE-----\
      MIIDYjCCAkoCCQCw8cwWkAxRaDANBgkqhkiG9w0BAQsFADBzMQswCQYDVQQGEwJV\
      ...
      CIi0IXotll17POA+hgHeWBxhKg2ULmPxR7UgHgCcnajTNMmbbaBfOSa6xe1qOJvU\
      CzFNJ0Yz\
      -----END CERTIFICATE-----
      

      Alternatively, you can use an escaped newline; that is, backslash (), and then an actual newline.

Authorization (ACLs)

In deployments where ACLs are enabled, additional ACLs must be added in both the source and destination clusters. For details on creating ACLs, see Authorization using Access Control Lists (ACLs). For a full list of associated operations, resources, and APIs, see the subtopic, Operations.

Caution

  • ACL migration (ACL sync), previously available in Confluent Platform 6.0.0 through 6.2.x, was removed in Confluent Platform 7.0.0 due to a security vulnerability, then re-introduced in Confluent Platform 7.1.0 with the vulnerability resolved. If you are using ACL migration in your pre-7.1.0 deployments, you should disable it by setting acl.sync.enable=false on your cluster links, or preferably, upgrade to 7.1.x.

  • Starting with Confluent Platform 7.1.0, ACL syncing is handled by more efficient and less error-prone methods. If you are using Cluster Linking on Confluent Platform 7.1.0 or later, you may notice that the cluster link occasionally appends data to ACLs on the primary cluster. When Cluster Linking is active with acl.sync enabled, a link ID list is added to ACLs to establish provenance within Cluster Linking. This provenance assists the cluster link when making decisions about replicating or removing ACLs between clusters; a more reliable method than that used in earlier versions. This information is set when cluster link’s ACL sync creates the ACL on the cluster. You can view all ACLs created by a cluster link via the following command:

    kafka-acls --bootstrap-server <destination cluster > --list --link-id <link id>
    

ACLs for brokers on destination cluster

If offset migration is not enabled, no additional permissions are required for the brokers.

If offset migration is enabled, additional ACLs are required for the brokers in the destination cluster.

Operation Resource API
READ Topic APIs used for consumer offset migration
READ Group APIs used for consumer offset migration
ALTER Topic (Mirror) AlterTopicMirrors

Migrating ACLs from Source to Destination Cluster

Cluster Linking allows you to connect clusters together and replicate data across Kafka clusters using the brokers for topic sharing, cluster migration, and hybrid architectures. For all scenarios, Cluster Linking provides the option to migrate the ACLs on the source to the destination. You can migrate ACLs across the clusters using the link, with fine-grained control as to which ACLs to migrate.

When you create the cluster link, you can specify which ACLs to migrate to the destination (mirror) by resource, principal, and host. You can migrate any number of specific ACLs needed for the destination cluster, or migrate all ACLs.

Caution

The --link-id option for kafka-acls, available starting with Confluent Platform 7.1.0 is experimental and should not be used in production deployments. In particular, do not use link-ID to create ACLs. If an ACL with --link-id is created on the source cluster, it is marked for management by the link ID, and will not be synced to the destination, irrespective of acl.sync.filters. Currently, Confluent Platform does not provide validation for the existence on the cluster of link IDs created with kafka-acls.

Prerequisites

  • An authorizer, such as the Configure Confluent Server Authorizer or AclAuthorizer from Apache Kafka®, must be configured.
  • Both ZooKeeper-based and centralized ACLs can be synced. If you want to sync ZooKeeper-based ACLs, set this configuration: confluent.authorizer.access.rule.providers= ZK_ACL. If you want centralized ACLs, set this configuration: confluent.authorizer.access.rule.providers=CONFLUENT. (To learn more, see Configure the Confluent Server Authorizer.)
  • You must have ACL Authorization configured on the clusters and cluster link, as described in Authorization using Access Control Lists (ACLs) and Authorization (ACLs).
  • You must have the appropriate authorizations: DESCRIBE Cluster ACLs (DescribeAcls API) on the source cluster and ALTER Cluster ACLs (CreateAcls/DeleteAcls APIs) on the destination cluster.

Configurations for periodic ACL migration

You can specify ACL migration for a cluster link.

The recommended method is to define the ACLs in a file, acl.filters.json, and pass the file name as an argument to the --acl-filters-json-file flag on CLI commands.

You can configure ACLs at the time you create the cluster link, or as an update to an existing configuration.

acl.sync.ms is a property that specifies how often to refresh ACLs. The default is 5 seconds (5000 ms).

An additional required parameter is acl.filters.json. This is a JSON string that gives the list of ACLs to migrate. The default is ””. You can populate this string by passing a JSON file that specifies the ACLs.

Here is an example of setting up a cluster link with ACL migration when you create the cluster link. Note that in this example, the link configuration (including ACL migration properties) are defined in a file, link-configs.txt rather than specified directly on the command line.

.bin/kafka-cluster-links --bootstrap-server destinationCluster:9092 \
 --command-config dest-credentials.txt --create --link example-link \
 --config-file link-configs.txt --acl-filters-json-file acls.filters.json --consumer-group-filters-json-file consumer-groups.json

You can then pass the configurations as an argument to --config-file in link-configs.txt. Here is an example of what you might have in link-configs.txt:

bootstrap.servers=sourceCluster:9092

ssl.endpoint.identification.algorithm=https

security.protocol=SASL_SSL

sasl.mechanism=PLAIN

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="sourceClusterUserName" password="sourceClusterPassword";

acl.sync.enable=true

acl.sync.ms=1000

consumer.offset.sync.enable=true

consumer.offset.sync.ms=1000

The following sections provide examples of how you might define the actual ACLs in acls.filters.json.

Caution

After running commands that call configuration files, delete any files that contain security credentials so that the credentials are not left on the file system. Given the kafka-cluster-links command example above, you would delete the dest-credentials.txt and link-configs.txt files.

Examples

The following examples show how to configure the JSON for various types of ACL migration, with granularity on a topic, a resource, or a mixed set.

Tip

  • You must use IP addresses, not host names.
  • If you do not specify a host, the ACLs will default to wildcard * to provide access from all hosts.

Migrate all ACLs from source to destination cluster

To migrate all ACLs from the source to the destination cluster, provide the following for acl.filters.json.

{
   "aclFilters": [
     {
       "resourceFilter": {
         "resourceType": "any",
         "patternType": "any"
       },
       "accessFilter": {
         "operation": "any",
         "permissionType": "any"
       }
     }
   ]
 }

Each field in the JSON has the following options:

  • resourceType: Can either be any or one of those specified in Resources in Authorization using Access Control Lists (ACLs).
  • patternType: Can either be any, literal, prefixed, or match
  • name: Name of resource, if left empty will default to the * wildcard which will match all names of the specified resourceType
  • principal: Name of principal, if left empty will default to the * wildcard which will match all principals with the specified operation and permissionType
  • operation: Can either be any or one of those specified in Operations under the “Operation” column
  • permissionType: Can be any, allow, or deny
  • host: Host for which operations can be coming from, if left empty this defaults to the * wildcard which will match all hosts with the specified operation and permissionType

Migrate all ACLs specific to a topic

To migrate all ACLs specific to a single topic (in this case topic pineapple), provide the following for acl.filters.json.

{
  "aclFilters": [
    {
      "resourceFilter": {
        "resourceType": "topic",
        "patternType": "literal",
        "name": "pineapple"
      },
      "accessFilter": {
        "operation": "any",
        "permissionType": "any"
      }
    }
  ]
}

Migrate ACLs specific to a principal

To migrate all ACLs specific to a principal, Alice, provide the following for acl.filters.json.

{
  "aclFilters": [
    {
      "resourceFilter": {
        "resourceType": "any",
        "patternType": "any"
      },
      "accessFilter": {
        "principal": "User:Alice",
        "operation": "any",
        "permissionType": "any"
      }
    }
  ]
}

Migrate a mix of ACLs

You can also do more complex ACL migrations with a specific ACL JSON. Assume you only want to migrate the following ACLs on the source cluster:

  1. Alice is allowed to write to any topic from host goodHostIPAddress.
  2. Alice is allowed to read from any topic from host goodHostIPAddress.
  3. Alice is allowed to create any topic from host goodHostIPAddress.
  4. Bob is allowed to read from any topic prefixed with coffee from host anotherGoodHostIPAddress.
  5. Bob is allowed to create any topic prefixed with breakfast.bar from anotherGoodHostIPAddress.
  6. Bob is allowed to create any topic with the name breakfast.bar from anotherGoodHostIPAddress.
  7. Mallory is denied all operations from any topic from any host.
  8. Admin is allowed all operations on any topic from host adminHostIPAddress.
  9. Trent is allowed to read from any topic from host greatHostIPAddress.
  10. Eve is denied to read from any topic from any host.

To migrate the ACLs specified above, you would specify the following ACL filters in the JSON file for acl.filters.json.

{
  "aclFilters": [
  // filter for 1
    {
      "resourceFilter": {
        "resourceType": "topic",
        "patternType": "any"
      },
      "accessFilter": {
        "principal": "User:Alice",
        "host": "goodHostIPAddress",
        "operation": "read",
        "permissionType": "allow"
      }
    },
    // filter for 2
    {
      "resourceFilter": {
        "resourceType": "topic",
        "patternType": "any"
      },
      "accessFilter": {
        "principal": "User:Alice",
        "host": "goodHostIPAddress",
        "operation": "write",
        "permissionType": "allow"
      }
    },
    // filter for 3
    {
      "resourceFilter": {
        "resourceType": "topic",
        "patternType": "any"
      },
      "accessFilter": {
        "principal": "User:Alice",
        "host": "goodHostIPAddress",
        "operation": "create",
        "permissionType": "allow"
      }
    },
    // filter for 4
   {
      "resourceFilter": {
        "resourceType": "topic",
        "name": "coffee",
        "patternType": "prefixed"
      },
      "accessFilter": {
        "principal": "User:Bob",
        "host": "anotherGoodHostIPAddress",
        "operation": "read",
        "permissionType": "allow"
      }
    },
    // filter for 5 and 6
   {
      "resourceFilter": {
        "resourceType": "topic",
        "name": "breakfast.bar",
        // match serves as a catch-all for all the names of
        // a topic this principal is authorized to access
        "patternType": "match"
      },
      "accessFilter": {
        "principal": "User:Bob",
        "host": "anotherGoodHostIPAddress",
        "operation": "create",
        "permissionType": "allow"
      }
    },
    // filter for 7
   {
      "resourceFilter": {
        "resourceType": "any",
        "patternType": "any"
      },
      "accessFilter": {
        "principal": "User:Mallory",
        "operation": "any",
        "permissionType": "deny"
      }
    },
    // filter for 8
   {
      "resourceFilter": {
        "resourceType": "any",
        "patternType": "any"
      },
      "accessFilter": {
        "principal": "User:Admin",
        "host":"adminHost",
        "operation": "any",
        "permissionType": "allow"
      }
    },
    // filter for 9
   {
      "resourceFilter": {
        "resourceType": "topic",
        "patternType": "any"
      },
      "accessFilter": {
        "principal": "User:Trent",
        "host":"greatHost",
        "operation": "read",
        "permissionType": "allow"
      }
    },
    // filter for 10
   {
      "resourceFilter": {
        "resourceType": "topic",
        "patternType": "any"
      },
      "accessFilter": {
        "principal": "User:Eve",
        "operation": "read",
        "permissionType": "deny"
      }
    }
  ]
}