Manage Security for Cluster Linking on Confluent Platform¶
All security configurations used to connect to the source cluster can be configured on the cluster link when the link is created. Each link is associated with exactly one link credential that will be used for authentication of connections to the source cluster using that link. Different cluster links on the same cluster may use different security credentials. The link credential must be granted appropriate permissions on the source cluster.
Authentication¶
The following example shows how to configure SASL_SSL with GSSAPI as the SASL
mechanism for the cluster link to talk to the source cluster. You can set
these configurations using a config-file
, as described in the section on
how to set properties on a cluster link.
security.protocol=SASL_SSL
ssl.truststore.location=/path/to/truststore.p12
ssl.truststore.password=truststore-password
ssl.truststore.type=PKCS12
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/path/to/link.keytab" \
principal="clusterlink1@EXAMPLE.COM";
Cluster Linking configurations should include client-side TLS/SSL and SASL/GSSAPI configuration options for connections to the source cluster in this scenario.
If you reference a keystore
/truststore
directly (for example, keystore.jks
),
the same files must be available in the same location on each of the brokers
For details on creating TLS/SSL key and trust stores, see Encrypt and Authenticate with TLS in Confluent Platform. For details on
SASL/GSSAPI, see Configure GSSAPI in Confluent Platform. All brokers at the destination cluster must be configured with
password.encoder.secret
for encrypting sensitive link configurations when security is enabled.
If the link is source-initiated (for example, from CP 7.1+ to Confluent Cloud), then password.encoder.secret
must be set
on the source cluster, also. This parameter must be set as a new password. To learn more, see Update password configurations dynamically.
To configure cluster links to use other SASL mechanisms, include client-side security configurations for that mechanism. See Authenticate with SASL using JAAS in Confluent Platform for other supported mechanisms. To use mutual TLS authentication as the security protocol, a key store should also be configured for the link. See Encrypt and Authenticate with TLS in Confluent Platform for details.
Note
The cluster links use source credentials configured on the link to communicate with the source cluster. These credentials must be valid in order for the link to function.
Mutual TLS (mTLS)¶
Cluster Linking can use mutual TLS (mTLS) for some, but not all, data exchanges:
- Confluent Cloud to Confluent Cloud does not use mTLS; it uses TLS and SASL as described in Cluster Linking on Confluent Cloud.
- Data coming into Confluent Cloud from OSS Kafka can be configured to use mTLS
- Data coming into Confluent Cloud from Confluent Platform can be configured to use either mTLS or source-initiated cluster links with TLS+SASL. mTLS can be used as the Confluent Platform side of a source-initiated cluster link.
Follow these guidelines when configuring mTLS for Confluent Platform to Confluent Platform cluster links:
To learn more about PEM files and possible configurations, see the Apache Kafka KIP that introduced PEM file support.
If you reference a
keystore
/truststore
directly (for example,keystore.jks
), the same files must be available in the same location on each of the brokersOnly PKCS #8 keys are supported for the private key specified in
ssl.keystore.key
. If the key is encrypted, the key password must be specified usingssl.key.password
.If the files cannot be copied to the brokers directly, be sure to list them in PEM format.
If using
kafka-cluster-links
, add\n
to indicate a new lines in the encrypted certificates. The example below is modified; it doesn’t include full keys, but should be enough to show the use of\n
:security.protocol=SSL ssl.keystore.type=PEM ssl.truststore.type=PEM ssl.key.password=clientpass ssl.endpoint.identification.algorithm= ssl.keystore.key=-----BEGIN ENCRYPTED PRIVATE KEY-----\nMIIE6TAbBgkqhkiG9w0BBQMwDgQI4d+aGvbJsIcCAggABIIEyB+qVnCcV9BbBz97\njmG1Xm6p2HIbO4tgMTld/Kmfx/4RHECdnhjiP3plH4yxZnaHBbiRHlno1d2xL93n\n4pqQkfhfJhBIqzoO3A==\n-----END ENCRYPTED PRIVATE KEY----- ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE-----\nMIIETjCCAzagAwIBAgIUX64Yeyvwae61we2rC4gniQUcrdIwDQYJKoZIhvcNAQEL\nBQAwZTEYMBYGA1UEAwwPVGVzdENvbmZsdWVudENBMQwwCgYDVQQLDANFQUExEDAO\nrPhy0+XGDZbC4PWYi3FogFTKKjKzjtO3ZP5nXt6zvF9/nCn8RpljKJH4brIIlhPM\n3Us=\n-----END CERTIFICATE----- ssl.truststore.certificates=-----BEGIN CERTIFICATE-----\nMIIDRjCCAi4CCQCW7jXMNbE1XzANBgkqhkiG9w0BAQsFADBlMRgwFgYDVQQDDA9U\nZXN0Q29uZmx1ZW50Q0ExDDAKBgNVBAsMA0VBQTEQMA4GA1UECgwHRnJlZW1hbjEP\n4Q/DVCHHUvJwHxd/5Bc08s56FYHFetoB1d4=\n-----END CERTIFICATE----
If using
confluent kafka link
, the command doesn’t interpret\n
in the same way. Instead, replace literal\n
with a single space.ssl.endpoint.identification.algorithm= security.protocol=SSL ssl.keystore.type=PEM ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE-----\ MIIDeTCCAmGgAwIBAgIJAKU/BVTP/yyUMA0GCSqGSIb3DQEBCwUAMHMxCzAJBgNV\ BAYTAlVTMQswCQYDVQQIDAJNVDEQMA4GA1UEBwwHQm96ZW1hbjEPMA0GA1UECgwG\ RmlndXJlMQ8wDQYDVQQLDAZGaWd1cmUxIzAhBgNVBAMMGmthZmthLXJvb3QudGVz\ ... k1PdbjufUewo7KQf8nde1IefAbSARG6Fu4oY2g4=\ -----END CERTIFICATE----- ssl.keystore.key=-----BEGIN ENCRYPTED PRIVATE KEY-----\ MIIE5DAcBgoqhkiG9w0BDAEBMA4ECIYZ0EPRimP2AgIIAASCBMJLvS+Mtm9HzU4O\ fu3EuTu7LyjH4KEyzSYsuKnhYMDbFxHD1V/dSIr3N8ZqDjz5Xr3TvsN3pVwa5BFh\ Tv8NhhzEWB6jQtf7xo0cGPlL2VVO95D2aAvLBTQegWxqOXrhGIzqffyw/59uyJi9\ ... WsrgdacPOGc4pC+bFXUOySXUBrvI47rPLn3tHHWnGfKEOKA0zMRPHjMUWmooKhKl\ VKz1+zPeZ1s=\ -----END ENCRYPTED PRIVATE KEY----- ssl.key.password=<REDACTED> ssl.truststore.type=PEM ssl.truststore.certificates=-----BEGIN CERTIFICATE-----\ MIIDYjCCAkoCCQCw8cwWkAxRaDANBgkqhkiG9w0BAQsFADBzMQswCQYDVQQGEwJV\ ... CIi0IXotll17POA+hgHeWBxhKg2ULmPxR7UgHgCcnajTNMmbbaBfOSa6xe1qOJvU\ CzFNJ0Yz\ -----END CERTIFICATE-----
Alternatively, you can use an escaped newline; that is, backslash (), and then an actual newline.
Authorization (ACLs)¶
In deployments where ACLs are enabled, additional ACLs must be added in both the source and destination clusters. For details on creating ACLs, see Authorization using Access Control Lists (ACLs) in Confluent Platform. For a full list of associated operations, resources, and APIs, see the subtopic, Operations.
Caution
ACL migration (ACL sync), previously available in Confluent Platform 6.0.0 through 6.2.x, was removed in Confluent Platform 7.0.0 due to a security vulnerability, then re-introduced in Confluent Platform 7.1.0 with the vulnerability resolved. If you are using ACL migration in your pre-7.1.0 deployments, you should disable it by setting
acl.sync.enable=false
on your cluster links, or preferably, upgrade to 7.1.x.Starting with Confluent Platform 7.1.0, ACL syncing is handled by more efficient and less error-prone methods. If you are using Cluster Linking on Confluent Platform 7.1.0 or later, you may notice that the cluster link occasionally appends data to ACLs on the primary cluster. When Cluster Linking is active with
acl.sync
enabled, a link ID list is added to ACLs to establish provenance within Cluster Linking. This provenance assists the cluster link when making decisions about replicating or removing ACLs between clusters; a more reliable method than that used in earlier versions. This information is set when cluster link’s ACL sync creates the ACL on the cluster. You can view all ACLs created by a cluster link via the following command:kafka-acls --bootstrap-server <destination cluster > --list --link-id <link id>
ACLs for user-issued cluster link and mirror commands¶
The following destination cluster ACLs are available for the user executing cluster link or mirror commands.
Operation | Resource | API |
---|---|---|
ALTER | Cluster | CreateClusterLinks |
ALTER | Cluster | DeleteClusterLinks |
ALTERCONFIGS | Cluster | AlterConfigs |
DESCRIBE | Cluster | ListClusterLinks |
ALTER | Cluster | CreateAcls, DeleteAcls, CreateTopics (to create mirrored topics) |
CREATE | Topic | CreateTopics (to create mirrored topics) |
ALTER | Topic | AlterTopicMirrors |
Tip
If you want to make sure that particular topics cannot be mirrored by the cluster link under any circumstances, then do not give authorization (through RBAC or ACLs) for the cluster link’s principal on those topics.
ACLs required to list and describe mirror topics¶
The list mirrors
command lists the mirror topics on a cluster and/or a cluster link.
You can call this command through the API with `REST`: `GET /kafka/v3/clusters/<cluster-id>/links/<link-name>/mirrors`
or from the Confluent Platform CLI with kafka-mirrors --list
.
The describe mirror
command gives information about a specific mirror topic.
You can call this command through the API with `REST`: `GET /kafka/v3/clusters/<cluster-id>/links/<link-name>/mirrors/<mirror-topic>`
or from the Confluent Platform CLI with kafka-mirrors --describe --topics <mirror-topic>
.
To list and describe mirror topics, you need one of the following ACLs:
- The DESCRIBE ACL on a Cluster resource, which provides access to both list and describe for all topics on the cluster.
- The DESCRIBE ACL on specific Topic(s), which provides access to both list and describe for the specified topics only.
ACLs for brokers on destination cluster¶
If offset migration is not enabled, no additional permissions are required for the brokers.
If offset migration is enabled, additional ACLs are required for the brokers in the destination cluster.
Operation | Resource | API |
---|---|---|
READ | Topic | APIs used for consumer offset migration |
READ | Group | APIs used for consumer offset migration |
ALTER | Topic (Mirror) | AlterTopicMirrors |
ACLs for link on source cluster¶
Source cluster ACLs are required for the link credential as follows.
Task | Operation | Resource | API |
---|---|---|---|
Mirroring | READ | Topic | Fetch |
DESCRIBE_CONFIGS | Topic | DescribeConfigs | |
Consumer offset migration | DESCRIBE | Topic | ListOffsets |
DESCRIBE | Group | ListGroups | |
Consumer offset migration | DESCRIBE_CONFIGS | Cluster | DescribeConfigs [1] |
[1] | If link.mode is bi-directional. |
ACLs for source-initiated links¶
ACLs required by the cluster link principal on the Destination Cluster¶
For a source-initiated cluster link, the cluster link’s principal requires the
Cluster: Alter
ACL on the Destination cluster.
Cluster: Alter
is required on the Destination cluster to ensure that the user who
created the cluster link is authorized to create a cluster link to this destination cluster.
ACLs required by the cluster link principal on the Source Cluster¶
The cluster link’s source cluster principal requires the Cluster: Alter
ACL
in addition to the normal ACLs required by a cluster link on a source cluster.
Cluster: Alter
is required on the Source cluster because the cluster link needs to
create reverse connections on the cluster, which is considered a privileged action.
If both auto-create mirror topics
and prefixing
are enabled, Describe:Cluster
is required on the source cluster.
ACLs required by the user principal on the Source Cluster¶
To perform actions to a source-initiated cluster link on the source cluster, the user’s principal requires the same ACLs as if they were creating a destination-initiated cluster link on that cluster:
Operation | Resource | API |
---|---|---|
ALTER | Cluster | CreateClusterLinks |
ALTER | Cluster | DeleteClusterLinks |
ALTERCONFIGS | Cluster | AlterConfigs |
DESCRIBE | Cluster | ListClusterLinks |
Migrating ACLs from Source to Destination Cluster¶
Cluster Linking allows you to connect clusters together and replicate data across Kafka clusters using the brokers for topic sharing, cluster migration, and hybrid architectures. For all scenarios, Cluster Linking provides the option to migrate the ACLs on the source to the destination. You can migrate ACLs across the clusters using the link, with fine-grained control as to which ACLs to migrate.
When you create the cluster link, you can specify which ACLs to migrate to the destination (mirror) by resource, principal, and host. You can migrate any number of specific ACLs needed for the destination cluster, or migrate all ACLs.
Tip
You can also migrate consumer groups from the source to the destination cluster.
Caution
The --link-id
option for kafka-acls
, available starting with Confluent Platform 7.1.0 is experimental and should not be used in production deployments.
In particular, do not use link-ID
to create ACLs. If an ACL with --link-id
is created on the source cluster, it is marked for management by the link ID,
and will not be synced to the destination, irrespective of acl.sync.filters
. Currently, Confluent Platform does not provide validation for the existence on the cluster
of link IDs created with kafka-acls
.
Prerequisites¶
- An authorizer, such as the Configure Confluent Server Authorizer in Confluent Platform or AclAuthorizer from Apache Kafka®, must be configured.
- Both ZooKeeper-based and centralized ACLs can be synced. If you want to sync ZooKeeper-based ACLs, set this configuration:
confluent.authorizer.access.rule.providers= ZK_ACL
. If you want centralized ACLs, set this configuration:confluent.authorizer.access.rule.providers=CONFLUENT
. (To learn more, see Configure the Confluent Server Authorizer.) - You must have ACL Authorization configured on the clusters and cluster link, as described in Authorization using Access Control Lists (ACLs) in Confluent Platform and Authorization (ACLs).
- You must have the appropriate authorizations: DESCRIBE Cluster ACLs (
DescribeAcls
API) on the source cluster and ALTER Cluster ACLs (CreateAcls
/DeleteAcls
APIs) on the destination cluster.
Configurations for periodic ACL migration¶
You can specify ACL migration for a cluster link.
The recommended method is to define the ACLs in a file, acl.filters.json
,
and pass the file name as an argument to the --acl-filters-json-file
flag on
CLI commands.
You can configure ACLs at the time you create the cluster link, or as an update to an existing configuration.
acl.sync.ms
is a property that specifies how often to refresh ACLs. The default is 5 seconds (5000 ms).
An additional required parameter is acl.filters.json
. This is a JSON string
that gives the list of ACLs to migrate. The default is ””
. You can populate
this string by passing a JSON file that specifies the ACLs.
Here is an example of setting up a cluster link with ACL migration when you create the cluster link.
Note that in this example, the link configuration (including ACL migration properties) are defined in
a file, link-configs.txt
rather than specified directly on the command line.
.bin/kafka-cluster-links --bootstrap-server destinationCluster:9092 \
--command-config dest-credentials.txt --create --link example-link \
--config-file link-configs.txt --acl-filters-json-file acls.filters.json --consumer-group-filters-json-file consumer-groups.json
You can then pass the configurations as an argument to --config-file
in link-configs.txt
. Here is an example of what you might have in link-configs.txt
:
bootstrap.servers=sourceCluster:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="sourceClusterUserName" password="sourceClusterPassword";
acl.sync.enable=true
acl.sync.ms=1000
consumer.offset.sync.enable=true
consumer.offset.sync.ms=1000
The following sections provide examples of how you might define the actual ACLs in acls.filters.json
.
Caution
After running commands that call configuration files, delete any files that contain security credentials
so that the credentials are not left on the file system. Given the kafka-cluster-links
command example above, you would delete the dest-credentials.txt
and link-configs.txt
files.
Examples¶
The following examples show how to configure the JSON for various types of ACL migration, with granularity on a topic, a resource, or a mixed set.
Tip
- You must use IP addresses, not host names.
- If you do not specify a host, the ACLs will default to wildcard * to provide access from all hosts.
Migrate all ACLs from source to destination cluster¶
To migrate all ACLs from the source to the destination cluster, provide the following for acl.filters.json
.
{
"aclFilters": [
{
"resourceFilter": {
"resourceType": "any",
"patternType": "any"
},
"accessFilter": {
"operation": "any",
"permissionType": "any"
}
}
]
}
Each field in the JSON has the following options:
resourceType
: Can either beany
or one of those specified in Resources in Authorization using Access Control Lists (ACLs) in Confluent Platform.patternType
: Can either beany
,literal
,prefixed
, ormatch
name
: Name of resource. If not set (left empty), will default to match all the names of the specifiedresourceType
. Asterisks do not act like wildcards in this context, but as explicit values. This means that if you setname
to*
, it will match resources named*
.principal
: Name of principal. If not set (left empty), will default to match all principals with the specifiedoperation
andpermissionType
. Asterisks do not act like wildcards in this context, but as explicit values. This means that if you setprincipal
to*
, it will match principals named*
.permissionType
: Can beany
,allow
, ordeny
host
: Host for which operations can be coming from. If not set (left empty), will default to match all hosts with the specifiedoperation
andpermissionType
. Asterisks do not act like wildcards in this context, but as explicit values. This means that if you sethost
to*
, it will match hosts named*
.
Migrate all ACLs specific to a topic¶
To migrate all ACLs specific to a single topic (in this case topic pineapple
), provide the following for acl.filters.json
.
{
"aclFilters": [
{
"resourceFilter": {
"resourceType": "topic",
"patternType": "literal",
"name": "pineapple"
},
"accessFilter": {
"operation": "any",
"permissionType": "any"
}
}
]
}
Migrate ACLs specific to a principal¶
To migrate all ACLs specific to a principal, Alice, provide the following for acl.filters.json
.
{
"aclFilters": [
{
"resourceFilter": {
"resourceType": "any",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Alice",
"operation": "any",
"permissionType": "any"
}
}
]
}
Migrate a mix of ACLs¶
You can also do more complex ACL migrations with a specific ACL JSON. Assume you only want to migrate the following ACLs on the source cluster:
- Alice is allowed to write to any topic from host
goodHostIPAddress
. - Alice is allowed to read from any topic from host
goodHostIPAddress
. - Alice is allowed to create any topic from host
goodHostIPAddress
. - Bob is allowed to read from any topic prefixed with
coffee
from hostanotherGoodHostIPAddress
. - Bob is allowed to create any topic prefixed with
breakfast.bar
fromanotherGoodHostIPAddress
. - Bob is allowed to create any topic with the name
breakfast.bar
fromanotherGoodHostIPAddress
. - Mallory is denied all operations from any topic from any host.
- Admin is allowed all operations on any topic from host
adminHostIPAddress
. - Trent is allowed to read from any topic from host
greatHostIPAddress
. - Eve is denied to read from any topic from any host.
To migrate the ACLs specified above, you would specify the following ACL filters in the JSON file for acl.filters.json
.
{
"aclFilters": [
// filter for 1
{
"resourceFilter": {
"resourceType": "topic",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Alice",
"host": "goodHostIPAddress",
"operation": "read",
"permissionType": "allow"
}
},
// filter for 2
{
"resourceFilter": {
"resourceType": "topic",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Alice",
"host": "goodHostIPAddress",
"operation": "write",
"permissionType": "allow"
}
},
// filter for 3
{
"resourceFilter": {
"resourceType": "topic",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Alice",
"host": "goodHostIPAddress",
"operation": "create",
"permissionType": "allow"
}
},
// filter for 4
{
"resourceFilter": {
"resourceType": "topic",
"name": "coffee",
"patternType": "prefixed"
},
"accessFilter": {
"principal": "User:Bob",
"host": "anotherGoodHostIPAddress",
"operation": "read",
"permissionType": "allow"
}
},
// filter for 5 and 6
{
"resourceFilter": {
"resourceType": "topic",
"name": "breakfast.bar",
// match serves as a catch-all for all the names of
// a topic this principal is authorized to access
"patternType": "match"
},
"accessFilter": {
"principal": "User:Bob",
"host": "anotherGoodHostIPAddress",
"operation": "create",
"permissionType": "allow"
}
},
// filter for 7
{
"resourceFilter": {
"resourceType": "any",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Mallory",
"operation": "any",
"permissionType": "deny"
}
},
// filter for 8
{
"resourceFilter": {
"resourceType": "any",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Admin",
"host":"adminHost",
"operation": "any",
"permissionType": "allow"
}
},
// filter for 9
{
"resourceFilter": {
"resourceType": "topic",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Trent",
"host":"greatHost",
"operation": "read",
"permissionType": "allow"
}
},
// filter for 10
{
"resourceFilter": {
"resourceType": "topic",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Eve",
"operation": "read",
"permissionType": "deny"
}
}
]
}