Connecting ksqlDB to Confluent Cloud

You can connect ksqlDB to your Apache Kafka® cluster in Confluent Cloud.

The ksqlDB servers must be configured to use Confluent Cloud. The ksqlDB CLI does not require configuration.

Prerequisites

  1. Customize your /etc/ksqldb/ksql-server.properties properties file.

    Tip

    To use ksqlDB with Confluent Cloud, you must configure the ksqlDB server. The ksqlDB CLI does not require any additional configuration.

    The following example shows the minimum configuration required to use ksqlDB with Confluent Cloud. You should also review the Recommended ksqlDB production settings.

    # a comma-separated list of the ccloud broker endpoints.
    # eg. r0.great-app.confluent.aws.prod.cloud:9092,r1.great-app.confluent.aws.prod.cloud:9093,r2.great-app.confluent.aws.prod.cloud:9094
    bootstrap.servers=<broker-endpoint1, broker-endpoint2, broker-endpoint3>
    ksql.internal.topic.replicas=3
    ksql.streams.replication.factor=3
    ksql.logging.processing.topic.replication.factor=3
    listeners=http://0.0.0.0:8088
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=\
        org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="<api-key>" \
        password="<api-secret>";
    
  2. (Optional) Add configs for Confluent Cloud Schema Registry per the example in ksql-server-ccloud.delta on GitHub at ccloud/examples/template_delta_configs.

    # Confluent Schema Registry configuration for ksqlDB Server
    ksql.schema.registry.basic.auth.credentials.source=USER_INFO
    ksql.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
    ksql.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
    
  3. Restart the ksqlDB server. The steps to restart are dependent on your environment.

For more information, see Quick Start for Apache Kafka using Confluent Cloud and ksqlDB Configuration Parameter Reference.

Create ACLs for ksqlDB to access Confluent Cloud

If your Kafka cluster in Confluent Cloud has ACLs enabled, your ksqlDB cluster must be granted access to specific resources on the Kafka cluster. Use the following Confluent Cloud CLI command to create the necessary ACLs in the Kafka cluster to allow ksqlDB to operate on the specified topics.

If you provision ksqlDB by using the UI, you don’t need to run the ccloud ksql app configure-acls command.

Tip

Use the --dry-run option to preview all of the ACLs that the command sets, without actually setting them.

Run the following command to give your ksqlDB cluster access to your Kafka cluster running in Confluent Cloud.

ccloud ksql app configure-acls <ksql-cluster-id>

Create ACLs for ksqlDB to access a specific topic in Confluent Cloud

In addition to assigning ACLs to bring up a ksqlDB cluster and communicate with Confluent Cloud, you need to specify ACLs that enable ksqlDB users to access specific topics.

Get the ksqlDB service account ID

To assign ACLs for specific topics, you need the service account ID for the ksqlDB cluster. Use the --dry-run option in the Confluent Cloud CLI to get the service account ID.

Note

The service account ID is different from the ksqlDB cluster ID.

Run the following command to get the service account ID.

ccloud ksql app configure-acls <ksql-cluster-id> <test-topic> --dry-run

Your output should resemble:

  ServiceAccountId | Permission |    Operation     | Resource |             Name             |   Type
+------------------+------------+------------------+----------+------------------------------+----------+
  User:10248       | ALLOW      | DESCRIBE         | CLUSTER  | kafka-cluster                | LITERAL
  User:10248       | ALLOW      | DESCRIBE_CONFIGS | CLUSTER  | kafka-cluster                | LITERAL
  User:10248       | ALLOW      | CREATE           | TOPIC    | pksqlc-lgwpn                 | PREFIXED
  ...

The service account ID appears in the first column. In this example, the ID is 10248.

Assign ACLs to SELECT FROM a stream or table

Run the following command to enable read access for SELECT FROM STREAM/TABLE statements on the stream or table’s underlying topics.

ccloud kafka acl create --allow --service-account <id> --operation READ --topic <topic>

Assign ACLs for writing to a topic

Run the following command to enable write access to a topic.

ccloud kafka acl create --allow --service-account <id> --operation WRITE --topic <topic>

Assign ACLs for full access to all topics

Run the following command to enable full access to all topics.

ccloud kafka acl create --allow --service-account <id> --operation READ --operation WRITE --topic '*'

Assign ACLs for full access to prefixed topics

Run the following command to enable full access to all topics with names that start with the specified prefix.

ccloud kafka acl create --allow --service-account <id> --operation READ --operation WRITE --topic 'prefix' --prefix