Connect Self-Managed Kafka Streams to Confluent Cloud

You can connect Kafka Streams to your Confluent Platform Apache Kafka® cluster in Confluent Cloud.

Prerequisites

  1. Use the Confluent CLI to log in to your Confluent Cloud cluster, and run the confluent kafka cluster list command to get the Kafka cluster ID.

    confluent kafka cluster list
    

    Your output should resemble:

      Current |     ID     |    Name    | Type  | Cloud    |  Region  | Availability | Status
    ----------+------------+------------+-------+----------+----------+--------------+---------
      *       | lkc-a123b  | my-cluster | BASIC | azure    | westus2  | single-zone  | UP
    
  2. Run the confluent kafka cluster describe command to get the endpoint for your Confluent Cloud cluster.

    confluent kafka cluster describe lkc-a123b
    

    Your output should resemble:

     +----------------------+---------------------------------------------------------+
     | Current              | true                                                    |
     | ID                   | lkc-a123b                                               |
     | Name                 | wikiedits_cluster                                       |
     | Type                 | BASIC                                                   |
     | Ingress Limit (MB/s) |                                                     250 |
     | Egress Limit (MB/s)  |                                                     750 |
     | Storage              | 5 TB                                                    |
     | Cloud                | azure                                                   |
     | Region               | westus2                                                 |
     | Availability         | single-zone                                             |
     | Status               | UP                                                      |
     | Endpoint             | SASL_SSL://pkc-41973.westus2.azure.confluent.cloud:9092 |
     | REST Endpoint        | https://pkc-41973.westus2.azure.confluent.cloud:443     |
     | Topic Count          |                                                      30 |
     +----------------------+---------------------------------------------------------+
    

    Save the Endpoint value, which you’ll use in a later step.

  3. Create a service account named my-streams-app. You must include a description.

    confluent iam service-account create my-streams-app --description "My Streams API and secrets service account."
    

    Your output should resemble:

    +-------------+--------------------------------+
    | ID          | sa-ab01cd                      |
    | Name        | my-streams-app                 |
    | Description | My Streams API and secrets     |
    |             | service account.               |
    +-------------+--------------------------------+
    

    Save the service account ID, which you’ll use in later steps.

  4. Create an API key and secret for service account sa-ab01cd. Be sure to replace the service account ID and Kafka cluster ID values shown here with your own:

    confluent api-key create --service-account sa-ab01cd --resource lkc-a123b
    

    Your output should resemble:

    It may take a couple of minutes for the API key to be ready.
    Save the API key and secret. The secret is not retrievable later.
    +---------+------------------------------------------------------------------+
    | API Key | ABCXQHYDZXMMUDEF                                                 |
    | Secret  | aBCde3s54+4Xv36YKPLDKy2aklGr6x/ShUrEX5D1Te4AzRlphFlr6eghmPX81HTF |
    +---------+------------------------------------------------------------------+
    

    Important

    Save the API key and secret. You require this information to configure your client applications. Be aware that this is the only time that you can access and view the key and secret.

To connect Streams to Confluent Cloud, update your existing Streams configs with the properties described here.

  1. Create a java.util.Properties instance.

  2. Configure your streams application. Kafka and Kafka Streams configuration options must be configured in the java.util.Properties instance before using Streams. In this example you must configure the Confluent Cloud broker endpoints (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG) and SASL config (SASL_JAAS_CONFIG)

    import java.util.Properties;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.streams.StreamsConfig;
    
    Properties props = new Properties();
    // Comma-separated list of the Confluent Cloud broker endpoints. For example:
    // r0.great-app.confluent.aws.prod.cloud:9092,r1.great-app.confluent.aws.prod.cloud:9093,r2.great-app.confluent.aws.prod.cloud:9094
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-endpoint1, broker-endpoint2, broker-endpoint3>");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
    props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required \
    username=\"<api-key>\" password=\"<api-secret>\";");
    
    // Recommended performance/resilience settings
    props.put(StreamsConfig.producerPrefix(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), 2147483647);
    props.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG), 9223372036854775807);
    
    // Any further settings
    props.put(... , ...);
    
  3. (Optional) Add configs for Confluent Cloud Schema Registry to your streams application per the example in java_streams.delta on GitHub at ccloud/examples/template_delta_configs.

    // Confluent Schema Registry for Java
    props.put("basic.auth.credentials.source", "USER_INFO");
    props.put("schema.registry.basic.auth.user.info", "<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>");
    props.put("schema.registry.url", "https://<SCHEMA_REGISTRY_ENDPOINT>");
    

Create ACLs for Kafka Streams to access Confluent Cloud

If your Kafka cluster in Confluent Cloud has ACLs enabled, your Kafka Streams application must be granted access to specific resources on the Kafka cluster. Use the following Confluent CLI commands to create the necessary ACLs in the Kafka cluster to allow Kafka Streams to operate on the specified topics.

Tip

Use the --dry-run option to preview all of the ACLs that the command sets, without actually setting them.

In the following commands, replace <service-account-id> with the ID of the service account that you created previously.

# Read access for input topic(s)
kafka-acls --bootstrap-server <ccloud-bootstrap-server> \
  --command-config <cloud.properties> --add --allow-principal User:<service-account-id> \
  --operation read --topic <input-topic>

# Write access for output topic(s)
kafka-acls --bootstrap-server <ccloud-bootstrap-server> \
  --command-config <cloud.properties> --add --allow-principal User:<service-account-id> \
  --operation write --topic <output-topic>

# Cluster access for idempotent write (needed for EOS)
kafka-acls --bootstrap-server <ccloud-bootstrap-server> \
  --command-config <cloud.properties> --add --allow-principal User:<service-account-id> \
  --operation idempotentWrite --cluster

# Full access to internal topics, consumer groups, and transactional ids
# for this application, prefixed by the Streams <application.id>
kafka-acls --bootstrap-server <ccloud-bootstrap-server> \
  --command-config <cloud.properties> --add --allow-principal User:<service-account-id> \
  --operation All --resource-pattern-type prefixed --topic <application.id> \
  --group <application.id> --transactional-id <application.id>