Manage Security for Cluster Linking on Confluent Platform¶
All security configurations used to connect to the source cluster can be configured on the cluster link when the link is created. Each link is associated with exactly one link credential that will be used for authentication of connections to the source cluster using that link. Different cluster links on the same cluster may use different security credentials. The link credential must be granted appropriate permissions on the source cluster.
Authentication¶
The following example shows how to configure SASL_SSL with GSSAPI as the SASL
mechanism for the cluster link to talk to the source cluster. You can set
these configurations using a config-file
, as described in the section on
how to set properties on a cluster link.
security.protocol=SASL_SSL
ssl.truststore.location=/path/to/truststore.p12
ssl.truststore.password=truststore-password
ssl.truststore.type=PKCS12
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/path/to/link.keytab" \
principal="clusterlink1@EXAMPLE.COM";
Cluster Linking configurations should include client-side SSL and SASL/GSSAPI configuration options for connections to the source cluster in this scenario.
If you reference a keystore
/truststore
directly (for example, keystore.jks
),
the same files must be available in the same location on each of the brokers
For details on creating SSL key and trust stores, see Encrypt and Authenticate with TLS. For details on
SASL/GSSAPI, see Configuring GSSAPI. All brokers at the destination cluster must be configured with
password.encoder.secret
for encrypting sensitive link configurations when security is enabled.
If the link is source-initiated (for example, from CP 7.1+ to Confluent Cloud), then password.encoder.secret
must be set
on the source cluster, also. This parameter must be set as a new password. To learn more, see Update password configurations dynamically.
To configure cluster links to use other SASL mechanisms, include client-side security configurations for that mechanism. See Authentication with SASL using JAAS for other supported mechanisms. To use two-way SSL authentication with SSL as the security protocol, a key store should also be configured for the link. See Encrypt and Authenticate with TLS for details.
Note
The cluster links use source credentials configured on the link to communicate with the source cluster. These credentials must be valid in order for the link to function.
Mutual TLS (mTLS)¶
Cluster Linking can use mTLS (two-way verification) for some, but not all, data exchanges:
- Confluent Cloud to Confluent Cloud does not use mTLS; it uses TLS and SASL as described in Cluster Linking on Confluent Cloud.
- Data coming into Confluent Cloud from OSS Kafka can be configured to use mTLS
- Data coming into Confluent Cloud from Confluent Platform can be configured to use either mTLS or source-initiated cluster links with TLS+SASL. mTLS can be used as the Confluent Platform side of a source-initiated cluster link.
Follow these guidelines when configuring mTLS for Confluent Platform to Confluent Platform cluster links:
To learn more about PEM files and possible configurations, see the Apache Kafka KIP that introduced PEM file support.
If you reference a
keystore
/truststore
directly (for example,keystore.jks
), the same files must be available in the same location on each of the brokersIf the files cannot be copied to the brokers directly, be sure to list them in PEM format
If using
kafka-cluster-links
, add\n
to indicate a new lines in the encrypted certificates. The example below is modified; it doesn’t include full keys, but should be enough to show the use of\n
:security.protocol=SSL ssl.keystore.type=PEM ssl.truststore.type=PEM ssl.key.password=clientpass ssl.endpoint.identification.algorithm= ssl.keystore.key=-----BEGIN ENCRYPTED PRIVATE KEY-----\nMIIE6TAbBgkqhkiG9w0BBQMwDgQI4d+aGvbJsIcCAggABIIEyB+qVnCcV9BbBz97\njmG1Xm6p2HIbO4tgMTld/Kmfx/4RHECdnhjiP3plH4yxZnaHBbiRHlno1d2xL93n\n4pqQkfhfJhBIqzoO3A==\n-----END ENCRYPTED PRIVATE KEY----- ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE-----\nMIIETjCCAzagAwIBAgIUX64Yeyvwae61we2rC4gniQUcrdIwDQYJKoZIhvcNAQEL\nBQAwZTEYMBYGA1UEAwwPVGVzdENvbmZsdWVudENBMQwwCgYDVQQLDANFQUExEDAO\nrPhy0+XGDZbC4PWYi3FogFTKKjKzjtO3ZP5nXt6zvF9/nCn8RpljKJH4brIIlhPM\n3Us=\n-----END CERTIFICATE----- ssl.truststore.certificates=-----BEGIN CERTIFICATE-----\nMIIDRjCCAi4CCQCW7jXMNbE1XzANBgkqhkiG9w0BAQsFADBlMRgwFgYDVQQDDA9U\nZXN0Q29uZmx1ZW50Q0ExDDAKBgNVBAsMA0VBQTEQMA4GA1UECgwHRnJlZW1hbjEP\n4Q/DVCHHUvJwHxd/5Bc08s56FYHFetoB1d4=\n-----END CERTIFICATE----
If using
confluent kafka link
, the command doesn’t interpret\n
in the same way. Instead, replace literal\n
with a single space.ssl.endpoint.identification.algorithm= security.protocol=SSL ssl.keystore.type=PEM ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE-----\ MIIDeTCCAmGgAwIBAgIJAKU/BVTP/yyUMA0GCSqGSIb3DQEBCwUAMHMxCzAJBgNV\ BAYTAlVTMQswCQYDVQQIDAJNVDEQMA4GA1UEBwwHQm96ZW1hbjEPMA0GA1UECgwG\ RmlndXJlMQ8wDQYDVQQLDAZGaWd1cmUxIzAhBgNVBAMMGmthZmthLXJvb3QudGVz\ ... k1PdbjufUewo7KQf8nde1IefAbSARG6Fu4oY2g4=\ -----END CERTIFICATE----- ssl.keystore.key=-----BEGIN ENCRYPTED PRIVATE KEY-----\ MIIE5DAcBgoqhkiG9w0BDAEBMA4ECIYZ0EPRimP2AgIIAASCBMJLvS+Mtm9HzU4O\ fu3EuTu7LyjH4KEyzSYsuKnhYMDbFxHD1V/dSIr3N8ZqDjz5Xr3TvsN3pVwa5BFh\ Tv8NhhzEWB6jQtf7xo0cGPlL2VVO95D2aAvLBTQegWxqOXrhGIzqffyw/59uyJi9\ ... WsrgdacPOGc4pC+bFXUOySXUBrvI47rPLn3tHHWnGfKEOKA0zMRPHjMUWmooKhKl\ VKz1+zPeZ1s=\ -----END ENCRYPTED PRIVATE KEY----- ssl.key.password=<REDACTED> ssl.truststore.type=PEM ssl.truststore.certificates=-----BEGIN CERTIFICATE-----\ MIIDYjCCAkoCCQCw8cwWkAxRaDANBgkqhkiG9w0BAQsFADBzMQswCQYDVQQGEwJV\ ... CIi0IXotll17POA+hgHeWBxhKg2ULmPxR7UgHgCcnajTNMmbbaBfOSa6xe1qOJvU\ CzFNJ0Yz\ -----END CERTIFICATE-----
Alternatively, you can use an escaped newline; that is, backslash (), and then an actual newline.
Authorization (ACLs)¶
In deployments where ACLs are enabled, additional ACLs must be added in both the source and destination clusters. For details on creating ACLs, see Authorization using Access Control Lists (ACLs). For a full list of associated operations, resources, and APIs, see the subtopic, Operations.
Caution
ACL migration (ACL sync), previously available in Confluent Platform 6.0.0 through 6.2.x, was removed in Confluent Platform 7.0.0 due to a security vulnerability,
then re-introduced in Confluent Platform 7.1.0 with the vulnerability resolved. If you are using ACL migration in your pre-7.1.0 deployments,
you should disable it by setting acl.sync.enable=false
on your cluster links, or preferably, upgrade to 7.1.x.
ACLs for User-Issued Cluster Link and Mirror Commands¶
The following destination cluster ACLs are available for the user executing cluster link or mirror commands.
Operation | Resource | API |
---|---|---|
ALTER | Cluster | CreateClusterLinks |
ALTER | Cluster | DeleteClusterLinks |
ALTERCONFIGS | Cluster | AlterConfigs |
DESCRIBE | Cluster | ListClusterLinks |
ALTER | Cluster | CreateAcls, DeleteAcls, CreateTopics (to create mirrored topics) |
CREATE | Topic | CreateTopics (to create mirrored topics) |
ALTER | Topic | AlterTopicMirrors |
Tip
If you want to make sure that particular topics cannot be mirrored by the cluster link under any circumstances, then do not give authorization (through RBAC or ACLs) for the cluster link’s principal on those topics.
ACLs required to list and describe mirror topics¶
The list mirrors
command lists the mirror topics on a cluster and/or a cluster link.
You can call this command through the API with `REST`: `GET /kafka/v3/clusters/<cluster-id>/links/<link-name>/mirrors`
or from the Confluent Platform CLI with kafka-mirrors --list
.
The describe mirror
command gives information about a specific mirror topic.
You can call this command through the API with `REST`: `GET /kafka/v3/clusters/<cluster-id>/links/<link-name>/mirrors/<mirror-topic>`
or from the Confluent Platform CLI with kafka-mirrors --describe --topics <mirror-topic>
.
To list and describe mirror topics, you need one of the following ACLs:
- The DESCRIBE ACL on a Cluster resource, which provides access to both list and describe for all topics on the cluster.
- The DESCRIBE ACL on specific Topic(s), which provides access to both list and describe for the specified topics only.
ACLs for Brokers on Destination Cluster¶
If offset migration is not enabled, no additional permissions are required for the brokers.
If offset migration is enabled, additional ACLs are required for the brokers in the destination cluster.
Operation | Resource | API |
---|---|---|
READ | Topic | APIs used for consumer offset migration |
READ | Group | APIs used for consumer offset migration |
ALTER | Topic (Mirror) | AlterTopicMirrors |
ACLs for Link on Source Cluster¶
Source cluster ACLs are required for the link credential as follows.
Task | Operation | Resource | API |
---|---|---|---|
Mirroring | READ | Topic | Fetch |
DESCRIBE_CONFIGS | Topic | DescribeConfigs | |
Consumer offset migration | DESCRIBE | Topic | ListOffsets |
DESCRIBE | Group | ListGroups |
ACLs for Source-Initiated Links¶
ACLs required by the cluster link principal on the Destination Cluster¶
For a source-initiated cluster link, the cluster link’s principal requires the
Cluster: Alter
ACL on the Destination cluster.
Cluster: Alter
is required on the Destination cluster to ensure that the user who
created the cluster link is authorized to create a cluster link to this destination cluster.
ACLs required by the cluster link principal on the Source Cluster¶
The cluster link’s source cluster principal requires the Cluster: Alter
ACL
in addition to the normal ACLs required by a cluster link on a source cluster.
Cluster: Alter
is required on the Source cluster because the cluster link needs to
create reverse connections on the cluster, which is considered a privileged action.
If both auto-create mirror topics
and prefixing
are enabled, Describe:Cluster
is required on the source cluster.
ACLs required by the user principal on the Source Cluster¶
To perform actions to a source-initiated cluster link on the source cluster, the user’s principal requires the same ACLs as if they were creating a destination-initiated cluster link on that cluster:
Operation | Resource | API |
---|---|---|
ALTER | Cluster | CreateClusterLinks |
ALTER | Cluster | DeleteClusterLinks |
ALTERCONFIGS | Cluster | AlterConfigs |
DESCRIBE | Cluster | ListClusterLinks |
Migrating ACLs from Source to Destination Cluster¶
Cluster Linking allows you to connect clusters together and replicate data across Kafka clusters using the brokers for topic sharing, cluster migration, and hybrid architectures. For all scenarios, Cluster Linking provides the option to migrate the ACLs on the source to the destination. You can migrate ACLs across the clusters using the link, with fine-grained control as to which ACLs to migrate.
When you create the cluster link, you can specify which ACLs to migrate to the destination (mirror) by resource, principal, and host. You can migrate any number of specific ACLs needed for the destination cluster, or migrate all ACLs.
Tip
You can also migrate consumer groups from the source to the destination cluster.
Caution
The --link-id
option for kafka-acls
, available starting with Confluent Platform 7.1.0 is experimental and should not be used in production deployments.
In particular, do not use link-ID
to create ACLs. If an ACL with --link-id
is created on the source cluster, it is marked for management by the link ID,
and will not be synced to the destination, irrespective of acl.sync.filters
. Currently, Confluent Platform does not provide validation for the existence on the cluster
of link IDs created with kafka-acls
.
Prerequisites¶
- An authorizer, such as the Configuring Confluent Server Authorizer or AclAuthorizer from Apache Kafka®, must be configured.
- Both ZooKeeper-based and centralized ACLs can be synced. If you want to sync ZooKeeper-based ACLs, set this configuration:
confluent.authorizer.access.rule.providers= ZK_ACL
. If you want centralized ACLs, set this configuration:confluent.authorizer.access.rule.providers=CONFLUENT
. (To learn more, see Configure the Confluent Server Authorizer.) - You must have ACL Authorization configured on the clusters and cluster link, as described in Authorization using Access Control Lists (ACLs) and Authorization (ACLs).
- You must have the appropriate authorizations: DESCRIBE Cluster ACLs (
DescribeAcls
API) on the source cluster and ALTER Cluster ACLs (CreateAcls
/DeleteAcls
APIs) on the destination cluster.
Configurations for Periodic ACL Migration¶
You can specify ACL migration for a cluster link.
The recommended method is to define the ACLs in a file, acl.filters.json
,
and pass the file name as an argument to the --acl-filters-json-file
flag on
CLI Commands.
You can configure ACLs at the time you create the cluster link, or as an update to an existing configuration.
acl.sync.ms
is a property that specifies how often to refresh ACLs. The default is 5 seconds (5000 ms).
An additional required parameter is acl.filters.json
. This is a JSON string
that gives the list of ACLs to migrate. The default is ””
. You can populate
this string by passing a JSON file that specifies the ACLs.
Here is an example of setting up a cluster link with ACL migration when you create the cluster link.
Note that in this example, the link configuration (including ACL migration properties) are defined in
a file, link-configs.txt
rather than specified directly on the command line.
.bin/kafka-cluster-links --bootstrap-server destinationCluster:9092 \
--command-config dest-credentials.txt --create --link example-link \
--config-file link-configs.txt --acl-filters-json-file acls.filters.json --consumer-group-filters-json-file consumer-groups.json
You can then pass the configurations as an argument to --config-file
in link-configs.txt
. Here is an example of what you might have in link-configs.txt
:
bootstrap.servers=sourceCluster:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="sourceClusterUserName" password="sourceClusterPassword";
acl.sync.enable=true
acl.sync.ms=1000
consumer.offset.sync.enable=true
consumer.offset.sync.ms=1000
The following sections provide examples of how you might define the actual ACLs in acls.filters.json
.
Caution
After running commands that call configuration files, delete any files that contain security credentials
so that the credentials are not left on the file system. Given the kafka-cluster-links
command example above, you would delete the dest-credentials.txt
and link-configs.txt
files.
Examples¶
The following examples show how to configure the JSON for various types of ACL migration, with granularity on a topic, a resource, or a mixed set.
Tip
- You must use IP addresses, not host names.
- If you do not specify a host, the ACLs will default to wildcard * to provide access from all hosts.
Migrate all ACLs from source to destination cluster¶
To migrate all ACLs from the source to the destination cluster, provide the following for acl.filters.json
.
{
"aclFilters": [
{
"resourceFilter": {
"resourceType": "any",
"patternType": "any"
},
"accessFilter": {
"operation": "any",
"permissionType": "any"
}
}
]
}
Each field in the JSON has the following options:
resourceType
: Can either beany
or one of those specified in Resources in Authorization using Access Control Lists (ACLs).patternType
: Can either beany
,literal
,prefixed
, ormatch
name
: Name of resource, if left empty will default to the * wildcard which will match all names of the specifiedresourceType
principal
: Name of principal, if left empty will default to the * wildcard which will match all principals with the specifiedoperation
andpermissionType
operation
: Can either beany
or one of those specified in Operations under the “Operation” columnpermissionType
: Can beany
,allow
, ordeny
host
: Host for which operations can be coming from, if left empty this defaults to the * wildcard which will match all hosts with the specifiedoperation
andpermissionType
Migrate all ACLs specific to a topic¶
To migrate all ACLs specific to a single topic (in this case topic pineapple
), provide the following for acl.filters.json
.
{
"aclFilters": [
{
"resourceFilter": {
"resourceType": "topic",
"patternType": "literal",
"name": "pineapple"
},
"accessFilter": {
"operation": "any",
"permissionType": "any"
}
}
]
}
Migrate ACLs specific to a principal¶
To migrate all ACLs specific to a principal, Alice, provide the following for acl.filters.json
.
{
"aclFilters": [
{
"resourceFilter": {
"resourceType": "any",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Alice",
"operation": "any",
"permissionType": "any"
}
}
]
}
Migrate a mix of ACLs¶
You can also do more complex ACL migrations with a specific ACL JSON. Assume you only want to migrate the following ACLs on the source cluster:
- Alice is allowed to write to any topic from host
goodHostIPAddress
. - Alice is allowed to read from any topic from host
goodHostIPAddress
. - Alice is allowed to create any topic from host
goodHostIPAddress
. - Bob is allowed to read from any topic prefixed with
coffee
from hostanotherGoodHostIPAddress
. - Bob is allowed to create any topic prefixed with
breakfast.bar
fromanotherGoodHostIPAddress
. - Bob is allowed to create any topic with the name
breakfast.bar
fromanotherGoodHostIPAddress
. - Mallory is denied all operations from any topic from any host.
- Admin is allowed all operations on any topic from host
adminHostIPAddress
. - Trent is allowed to read from any topic from host
greatHostIPAddress
. - Eve is denied to read from any topic from any host.
To migrate the ACLs specified above, you would specify the following ACL filters in the JSON file for acl.filters.json
.
{
"aclFilters": [
// filter for 1
{
"resourceFilter": {
"resourceType": "topic",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Alice",
"host": "goodHostIPAddress",
"operation": "read",
"permissionType": "allow"
}
},
// filter for 2
{
"resourceFilter": {
"resourceType": "topic",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Alice",
"host": "goodHostIPAddress",
"operation": "write",
"permissionType": "allow"
}
},
// filter for 3
{
"resourceFilter": {
"resourceType": "topic",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Alice",
"host": "goodHostIPAddress",
"operation": "create",
"permissionType": "allow"
}
},
// filter for 4
{
"resourceFilter": {
"resourceType": "topic",
"name": "coffee",
"patternType": "prefixed"
},
"accessFilter": {
"principal": "User:Bob",
"host": "anotherGoodHostIPAddress",
"operation": "read",
"permissionType": "allow"
}
},
// filter for 5 and 6
{
"resourceFilter": {
"resourceType": "topic",
"name": "breakfast.bar",
// match serves as a catch-all for all the names of
// a topic this principal is authorized to access
"patternType": "match"
},
"accessFilter": {
"principal": "User:Bob",
"host": "anotherGoodHostIPAddress",
"operation": "create",
"permissionType": "allow"
}
},
// filter for 7
{
"resourceFilter": {
"resourceType": "any",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Mallory",
"operation": "any",
"permissionType": "deny"
}
},
// filter for 8
{
"resourceFilter": {
"resourceType": "any",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Admin",
"host":"adminHost",
"operation": "any",
"permissionType": "allow"
}
},
// filter for 9
{
"resourceFilter": {
"resourceType": "topic",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Trent",
"host":"greatHost",
"operation": "read",
"permissionType": "allow"
}
},
// filter for 10
{
"resourceFilter": {
"resourceType": "topic",
"patternType": "any"
},
"accessFilter": {
"principal": "User:Eve",
"operation": "read",
"permissionType": "deny"
}
}
]
}