Connecting ksqlDB to Confluent Cloud

You can connect ksqlDB to your Apache Kafka® cluster in Confluent Cloud.

The ksqlDB servers must be configured to use Confluent Cloud. The ksqlDB CLI does not require configuration.

Prerequisites

  1. Use the Confluent Cloud CLI to log in to your Confluent Cloud cluster, and run the ccloud kafka cluster list command to get the Kafka cluster ID.

    ccloud kafka cluster list
    

    Your output should resemble:

          Id      |       Name        |     Type     | Provider |  Region  | Availability | Status
    +-------------+-------------------+--------------+----------+----------+--------------+--------+
        lkc-2ra82 | ksqldb-quickstart | BASIC_LEGACY | gcp      | us-west2 | multi-zone   | UP
    
  2. Run the ccloud kafka cluster describe command to get the endpoint for your Confluent Cloud cluster.

    ccloud kafka cluster describe lkc-2ra82
    

    Your output should resemble:

     +--------------+--------------------------------------------------------+
     | Id           | lkc-2ra82                                              |
     | Name         | ksqldb-quickstart                                      |
     | Type         | BASIC_LEGACY                                           |
     | Ingress      |                                                    100 |
     | Egress       |                                                    100 |
     | Storage      |                                                   5000 |
     | Provider     | gcp                                                    |
     | Availability | multi-zone                                             |
     | Region       | us-west2                                               |
     | Status       | UP                                                     |
     | Endpoint     | SASL_SSL://pkc-4s087.us-west2.gcp.confluent.cloud:9092 |
     | ApiEndpoint  | https://pkac-42kz2.us-west2.gcp.confluent.cloud        |
     +--------------+--------------------------------------------------------+
    

    Note the Endpoint value, which you’ll use in the next step.

  3. Customize your /etc/ksqldb/ksql-server.properties properties file.

    Tip

    To use ksqlDB with Confluent Cloud, you must configure the ksqlDB server. The ksqlDB CLI does not require any additional configuration.

    The following example shows the minimum configuration required to use ksqlDB with Confluent Cloud. You should also review the Recommended ksqlDB production settings.

    # For bootstrap.servers, assign the Endpoint value from the "ccloud kafka cluster describe" command.
    # eg. pkc-4s087.us-west2.gcp.confluent.cloud:9092
    bootstrap.servers=<broker-endpoint>
    ksql.internal.topic.replicas=3
    ksql.streams.replication.factor=3
    ksql.logging.processing.topic.replication.factor=3
    listeners=http://0.0.0.0:8088
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=\
        org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="<api-key>" \
        password="<api-secret>";
    
  4. (Optional) Add configs for Confluent Cloud Schema Registry per the example in ksql-server-ccloud.delta on GitHub at ccloud/examples/template_delta_configs.

    # Confluent Schema Registry configuration for ksqlDB Server
    ksql.schema.registry.basic.auth.credentials.source=USER_INFO
    ksql.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
    ksql.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
    
  5. Restart the ksqlDB server. The steps to restart are dependent on your environment.

For more information, see Quick Start for Apache Kafka using Confluent Cloud and ksqlDB Configuration Parameter Reference.

Create ACLs for ksqlDB to access Confluent Cloud

If your Kafka cluster in Confluent Cloud has ACLs enabled, your ksqlDB application must be granted access to specific resources on the Kafka cluster. Use the following Confluent Cloud CLI command to create the necessary ACLs in the Kafka cluster to allow ksqlDB to operate on the specified topics.

If you provision ksqlDB by using the UI, you don’t need to run the ccloud ksql app configure-acls command.

Tip

Use the --dry-run option to preview all of the ACLs that the command sets, without actually setting them.

Run the following command to give your ksqlDB application access to your Kafka cluster running in Confluent Cloud.

ccloud ksql app configure-acls <ksql-cluster-id>

Create ACLs for ksqlDB to access a specific topic in Confluent Cloud

In addition to assigning ACLs to bring up a ksqlDB application and communicate with Confluent Cloud, you need to specify ACLs that enable ksqlDB users to access specific topics.

Get the ksqlDB service account ID

To assign ACLs for specific topics, you need the service account ID for the ksqlDB application. Use the --dry-run option in the Confluent Cloud CLI to get the service account ID.

Note

The service account ID is different from the ksqlDB application ID.

Run the following command to get the service account ID.

ccloud ksql app configure-acls <ksql-cluster-id> <test-topic> --dry-run

Your output should resemble:

  ServiceAccountId | Permission |    Operation     | Resource |             Name             |   Type
+------------------+------------+------------------+----------+------------------------------+----------+
  User:10248       | ALLOW      | DESCRIBE         | CLUSTER  | kafka-cluster                | LITERAL
  User:10248       | ALLOW      | DESCRIBE_CONFIGS | CLUSTER  | kafka-cluster                | LITERAL
  User:10248       | ALLOW      | CREATE           | TOPIC    | pksqlc-lgwpn                 | PREFIXED
  ...

The service account ID appears in the first column. In this example, the ID is 10248.

Assign ACLs to SELECT FROM a stream or table

Run the following command to enable read access for SELECT FROM STREAM/TABLE statements on the stream or table’s underlying topics.

ccloud kafka acl create --allow --service-account <id> --operation READ --topic <topic>

Assign ACLs for writing to a topic

Run the following command to enable write access to a topic.

ccloud kafka acl create --allow --service-account <id> --operation WRITE --topic <topic>

Assign ACLs for creating a topic

When you write a ksqlDB statement that creates a Kafka topic, like CREATE STREAM, CREATE STREAM AS SELECT, CREATE TABLE, or CREATE TABLE AS SELECT, ksqlDB needs access to create the topic, in addition to access for reading or writing to the topic.

To grant CREATE and READ access for a CREATE STREAM statement, like CREATE STREAM FOO (...) WITH (KAFKA_TOPIC='FOO', ...);, run the following commands. The commands for CREATE TABLE are similar.

ccloud kafka acl create --allow --service-account <id> --operation CREATE --topic 'FOO'
ccloud kafka acl create --allow --service-account <id> --operation READ --topic 'FOO'
ccloud kafka acl create --allow --service-account <id> --operation CREATE --cluster-scope

To grant CREATE and WRITE access for a CREATE STREAM AS SELECT statement, like CREATE STREAM BAR WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM FOO;, run the following commands. The commands for CREATE TABLE AS SELECT are similar.

ccloud kafka acl create --allow --service-account <id> --operation CREATE --topic 'BAR'
ccloud kafka acl create --allow --service-account <id> --operation WRITE --topic 'BAR'
ccloud kafka acl create --allow --service-account <id> --operation CREATE --cluster-scope

Assign ACLs for full access to all topics

Run the following command to enable full access to all topics.

ccloud kafka acl create --allow --service-account <id> --operation READ --operation WRITE --topic '*'

Assign ACLs for full access to prefixed topics

Run the following command to enable full access to all topics with names that start with the specified prefix.

ccloud kafka acl create --allow --service-account <id> --operation READ --operation WRITE --topic 'prefix' --prefix