Connect Local ksqlDB to Confluent Cloud

You can connect ksqlDB to your Apache Kafka® cluster in Confluent Cloud.

The ksqlDB servers must be configured to use Confluent Cloud. The ksqlDB CLI does not require configuration.

Prerequisites

  1. Use the Confluent CLI to log in to your Confluent Cloud cluster, and run the confluent kafka cluster list command to get the Kafka cluster ID.

    confluent kafka cluster list
    

    Your output should resemble:

          Id      |       Name        |     Type     | Cloud    |  Region  | Availability | Status
    +-------------+-------------------+--------------+----------+----------+--------------+--------+
        lkc-a123b | ksqldb-quickstart | BASIC_LEGACY | gcp      | us-west2 | multi-zone   | UP
    
  2. Run the confluent kafka cluster describe command to get the endpoint for your Confluent Cloud cluster.

    confluent kafka cluster describe lkc-a123b
    

    Your output should resemble:

     +--------------+--------------------------------------------------------+
     | Id           | lkc-a123b                                              |
     | Name         | ksqldb-quickstart                                      |
     | Type         | BASIC                                                  |
     | Ingress      |                                                    100 |
     | Egress       |                                                    100 |
     | Storage      |                                                   5000 |
     | Cloud        | azure                                                  |
     | Availability | single-zone                                            |
     | Region       | us-west2                                               |
     | Status       | UP                                                     |
     | Endpoint     | SASL_SSL://pkc-4s987.us-west2.gcp.confluent.cloud:9092 |
     | ApiEndpoint  | https://pkac-42kz6.us-west2.gcp.confluent.cloud        |
     +--------------+--------------------------------------------------------+
    

    Save the Endpoint value, which you’ll use in a later step.

  3. Create a service account named my-ksqldb-app. You must include a description.

    confluent iam service-account create my-ksqldb-app --description "My ksqlDB API and secrets service account."
    

    Your output should resemble:

    +-------------+--------------------------------+
    | Id          |                         123456 |
    | Resource ID | sa-efg123                      |
    | Name        | my-ksqldb-app                  |
    | Description | My ksqlDB API and secrets      |
    |             | service account.               |
    +-------------+--------------------------------+
    

    Save the service account ID, which you’ll use in later steps.

  4. Create an API key and secret for service account 123456. Be sure to replace the service account ID and Kafka cluster ID values shown here with your own:

    confluent api-key create --service-account 123456 --resource lkc-a123b
    

    Your output should resemble:

    It may take a couple of minutes for the API key to be ready.
    Save the API key and secret. The secret is not retrievable later.
    +---------+------------------------------------------------------------------+
    | API key | ABCXQHYDZXMMUDEF                                                 |
    | Secret  | aBCde3s54+4Xv36YKPLDKy2aklGr6x/ShUrEX5D1Te4AzRlphFlr6eghmPX81HTF |
    +---------+------------------------------------------------------------------+
    

    Important

    Save the API key and secret. You require this information to configure your client applications. Be aware that this is the only time that you can access and view the API key and secret.

  5. Customize your /etc/ksqldb/ksql-server.properties properties file.

    Tip

    To use ksqlDB with Confluent Cloud, you must configure the ksqlDB server. The ksqlDB CLI does not require any additional configuration.

    The following example shows the minimum configuration required to use ksqlDB with Confluent Cloud. You should also review the Recommended ksqlDB production settings. Replace <api-key> and <api-secret> with the API key and secret that you generated previously.

    # For bootstrap.servers, assign the Endpoint value from the "confluent kafka cluster describe" command.
    # eg. pkc-4s087.us-west2.gcp.confluent.cloud:9092
    bootstrap.servers=<broker-endpoint>
    ksql.internal.topic.replicas=3
    ksql.streams.replication.factor=3
    ksql.logging.processing.topic.replication.factor=3
    listeners=http://0.0.0.0:8088
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    # Replace <api-key> and <api-secret> with your API key and secret.
    sasl.jaas.config=\
        org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="<api-key>" \
        password="<api-secret>";
    
  6. (Optional) Add configs for Confluent Cloud Schema Registry per the example in ksql-server-ccloud.delta on GitHub at ccloud/examples/template_delta_configs.

    # Confluent Schema Registry configuration for ksqlDB Server
    ksql.schema.registry.basic.auth.credentials.source=USER_INFO
    ksql.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
    ksql.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
    
  7. Restart the ksqlDB server. The steps to restart are dependent on your environment.

For more information, ksqlDB Configuration Parameter Reference.

Create ACLs for ksqlDB to access Confluent Cloud

If your Kafka cluster in Confluent Cloud has ACLs enabled, your ksqlDB application must be granted access to specific resources on the Kafka cluster. Use the following Confluent CLI commands to create the necessary ACLs in the Kafka cluster.

confluent kafka acl create --allow --service-account <id> --operations "DESCRIBE,DESCRIBE_CONFIGS" --cluster-scope
confluent kafka acl create --allow --service-account <id> --operations "WRITE,DESCRIBE" --transactional-id <ksqldb-service-id>
confluent kafka acl create --allow --service-account <id> --operations "CREATE,DESCRIBE,ALTER,DESCRIBE_CONFIGS,ALTER_CONFIGS,READ,WRITE,DELETE" --topic <ksqldb-service-id> --prefix
confluent kafka acl create --allow --service-account <id> --operations "CREATE,DESCRIBE,ALTER,DESCRIBE_CONFIGS,ALTER_CONFIGS,READ,WRITE,DELETE" --topic _confluent-ksql-<ksqldb-service-id> --prefix
confluent kafka acl create --allow --service-account <id> --operations "CREATE,DESCRIBE,ALTER,DESCRIBE_CONFIGS,ALTER_CONFIGS,READ,WRITE,DELETE" --consumer-group _confluent-ksql-<ksqldb-service-id> --prefix

Create ACLs for ksqlDB to access a specific topic in Confluent Cloud

In addition to assigning ACLs to bring up a ksqlDB application and communicate with Confluent Cloud, you need to specify ACLs that enable ksqlDB users to access specific topics.

In the following commands, replace <id> with the service account ID that you created previously.

Assign ACLs to SELECT FROM a stream or table

Run the following command to enable read access for SELECT FROM STREAM/TABLE statements on the stream or table’s underlying topics.

confluent kafka acl create --allow --service-account <id> --operations read --topic <topic>

Assign ACLs for writing to a topic

Run the following command to enable write access to a topic.

confluent kafka acl create --allow --service-account <id> --operations write --topic <topic>

Assign ACLs for creating a topic

When you write a ksqlDB statement that creates a Kafka topic, like CREATE STREAM, CREATE STREAM AS SELECT, CREATE TABLE, or CREATE TABLE AS SELECT, ksqlDB needs access to create the topic, in addition to access for reading or writing to the topic.

To grant CREATE and READ access for a CREATE STREAM statement, like CREATE STREAM FOO (...) WITH (KAFKA_TOPIC='FOO', ...);, run the following commands. The commands for CREATE TABLE are similar.

confluent kafka acl create --allow --service-account <id> --operations create --topic 'FOO'
confluent kafka acl create --allow --service-account <id> --operations read --topic 'FOO'
confluent kafka acl create --allow --service-account <id> --operations create --cluster-scope

To grant CREATE and WRITE access for a CREATE STREAM AS SELECT statement, like CREATE STREAM BAR WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM FOO;, run the following commands. The commands for CREATE TABLE AS SELECT are similar.

confluent kafka acl create --allow --service-account <id> --operations create --topic 'BAR'
confluent kafka acl create --allow --service-account <id> --operations write --topic 'BAR'
confluent kafka acl create --allow --service-account <id> --operations create --cluster-scope

Assign ACLs for full access to all topics

Run the following command to enable full access to all topics.

confluent kafka acl create --allow --service-account <id> --operations read,write --topic '*'

Assign ACLs for full access to prefixed topics

Run the following command to enable full access to all topics with names that start with the specified prefix.

confluent kafka acl create --allow --service-account <id> --operations read,write --topic 'prefix' --prefix

Docker environment

You can run a mix of fully-managed services in Confluent Cloud and self-managed components running in Docker. For a Docker environment that connects any Confluent Platform component to Confluent Cloud, see cp-all-in-one-cloud.