Connect Self-Managed Kafka Streams to Confluent Cloud¶
You can connect Kafka Streams to your Confluent Platform Apache Kafka® cluster in Confluent Cloud.
Prerequisites
Use the Confluent CLI to log in to your Confluent Cloud cluster, and run the
confluent kafka cluster list
command to get the Kafka cluster ID.confluent kafka cluster list
Your output should resemble:
Current | ID | Name | Type | Cloud | Region | Availability | Status ----------+------------+------------+-------+----------+----------+--------------+--------- * | lkc-a123b | my-cluster | BASIC | azure | westus2 | single-zone | UP
Run the
confluent kafka cluster describe
command to get the endpoint for your Confluent Cloud cluster.confluent kafka cluster describe lkc-a123b
Your output should resemble:
+----------------------+---------------------------------------------------------+ | Current | true | | ID | lkc-a123b | | Name | wikiedits_cluster | | Type | BASIC | | Ingress Limit (MB/s) | 250 | | Egress Limit (MB/s) | 750 | | Storage | 5 TB | | Cloud | azure | | Region | westus2 | | Availability | single-zone | | Status | UP | | Endpoint | SASL_SSL://pkc-41973.westus2.azure.confluent.cloud:9092 | | REST Endpoint | https://pkc-41973.westus2.azure.confluent.cloud:443 | | Topic Count | 30 | +----------------------+---------------------------------------------------------+
Save the
Endpoint
value, which you’ll use in a later step.Create a service account named
my-streams-app
. You must include a description.confluent iam service-account create my-streams-app --description "My Streams API and secrets service account."
Your output should resemble:
+-------------+--------------------------------+ | ID | sa-ab01cd | | Name | my-streams-app | | Description | My Streams API and secrets | | | service account. | +-------------+--------------------------------+
Save the service account ID, which you’ll use in later steps.
Create an API key and secret for service account
sa-ab01cd
. Be sure to replace the service account ID and Kafka cluster ID values shown here with your own:confluent api-key create --service-account sa-ab01cd --resource lkc-a123b
Your output should resemble:
It may take a couple of minutes for the API key to be ready. Save the API key and secret. The secret is not retrievable later. +---------+------------------------------------------------------------------+ | API Key | ABCXQHYDZXMMUDEF | | Secret | aBCde3s54+4Xv36YKPLDKy2aklGr6x/ShUrEX5D1Te4AzRlphFlr6eghmPX81HTF | +---------+------------------------------------------------------------------+
Important
Save the API key and secret. You require this information to configure your client applications. Be aware that this is the only time that you can access and view the key and secret.
To connect Streams to Confluent Cloud, update your existing Streams configs with the properties described here.
Create a
java.util.Properties
instance.Configure your streams application. Kafka and Kafka Streams configuration options must be configured in the
java.util.Properties
instance before using Streams. In this example you must configure the Confluent Cloud broker endpoints (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
) and SASL config (SASL_JAAS_CONFIG
)import java.util.Properties; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.streams.StreamsConfig; Properties props = new Properties(); // Comma-separated list of the Confluent Cloud broker endpoints. For example: // r0.great-app.confluent.aws.prod.cloud:9092,r1.great-app.confluent.aws.prod.cloud:9093,r2.great-app.confluent.aws.prod.cloud:9094 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-endpoint1, broker-endpoint2, broker-endpoint3>"); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required \ username=\"<api-key>\" password=\"<api-secret>\";"); // Recommended performance/resilience settings props.put(StreamsConfig.producerPrefix(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), 2147483647); props.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG), 9223372036854775807); // Any further settings props.put(... , ...);
(Optional) Add configs for Confluent Cloud Schema Registry to your streams application per the example in java_streams.delta on GitHub at ccloud/examples/template_delta_configs.
// Confluent Schema Registry for Java props.put("basic.auth.credentials.source", "USER_INFO"); props.put("schema.registry.basic.auth.user.info", "<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>"); props.put("schema.registry.url", "https://<SCHEMA_REGISTRY_ENDPOINT>");
- For more information, see Configuring a Streams Application.
- To view a working example of hybrid Apache Kafka® clusters from self-hosted to Confluent Cloud, see cp-demo.
- For example configs for all Confluent Platform components and clients connecting to Confluent Cloud, see template examples for components.
- To look at all the code used in the Confluent Cloud demo, see the Confluent Cloud demo examples.
Create ACLs for Kafka Streams to access Confluent Cloud¶
If your Kafka cluster in Confluent Cloud has ACLs enabled, your Kafka Streams application must be granted access to specific resources on the Kafka cluster. Use the following Confluent CLI commands to create the necessary ACLs in the Kafka cluster to allow Kafka Streams to operate on the specified topics.
Tip
Use the --dry-run
option to preview all of the ACLs that the command
sets, without actually setting them.
In the following commands, replace <service-account-id>
with the ID of the
service account that you created previously.
# Read access for input topic(s)
kafka-acls --bootstrap-server <ccloud-bootstrap-server> \
--command-config <cloud.properties> --add --allow-principal User:<service-account-id> \
--operation read --topic <input-topic>
# Write access for output topic(s)
kafka-acls --bootstrap-server <ccloud-bootstrap-server> \
--command-config <cloud.properties> --add --allow-principal User:<service-account-id> \
--operation write --topic <output-topic>
# Cluster access for idempotent write (needed for EOS)
kafka-acls --bootstrap-server <ccloud-bootstrap-server> \
--command-config <cloud.properties> --add --allow-principal User:<service-account-id> \
--operation idempotentWrite --cluster
# Full access to internal topics, consumer groups, and transactional ids
# for this application, prefixed by the Streams <application.id>
kafka-acls --bootstrap-server <ccloud-bootstrap-server> \
--command-config <cloud.properties> --add --allow-principal User:<service-account-id> \
--operation All --resource-pattern-type prefixed --topic <application.id> \
--group <application.id> --transactional-id <application.id>