Connect Local ksqlDB to Confluent Cloud¶
You can connect ksqlDB to your Apache Kafka® cluster in Confluent Cloud.
The ksqlDB servers must be configured to use Confluent Cloud. The ksqlDB CLI does not require configuration.
Prerequisites
Use the Confluent CLI to log in to your Confluent Cloud cluster, and run the
confluent kafka cluster list
command to get the Kafka cluster ID.confluent kafka cluster list
Your output should resemble:
Id | Name | Type | Cloud | Region | Availability | Status +-------------+-------------------+--------------+----------+----------+--------------+--------+ lkc-a123b | ksqldb-quickstart | BASIC_LEGACY | gcp | us-west2 | multi-zone | UP
Run the
confluent kafka cluster describe
command to get the endpoint for your Confluent Cloud cluster.confluent kafka cluster describe lkc-a123b
Your output should resemble:
+--------------+--------------------------------------------------------+ | Id | lkc-a123b | | Name | ksqldb-quickstart | | Type | BASIC | | Ingress | 100 | | Egress | 100 | | Storage | 5000 | | Cloud | azure | | Availability | single-zone | | Region | us-west2 | | Status | UP | | Endpoint | SASL_SSL://pkc-4s987.us-west2.gcp.confluent.cloud:9092 | | ApiEndpoint | https://pkac-42kz6.us-west2.gcp.confluent.cloud | +--------------+--------------------------------------------------------+
Save the
Endpoint
value, which you’ll use in a later step.Create a service account named
my-ksqldb-app
. You must include a description.confluent iam service-account create my-ksqldb-app --description "My ksqlDB API and secrets service account."
Your output should resemble:
+-------------+--------------------------------+ | Id | 123456 | | Resource ID | sa-efg123 | | Name | my-ksqldb-app | | Description | My ksqlDB API and secrets | | | service account. | +-------------+--------------------------------+
Save the service account ID, which you’ll use in later steps.
Create an API key and secret for service account
123456
. Be sure to replace the service account ID and Kafka cluster ID values shown here with your own:confluent api-key create --service-account 123456 --resource lkc-a123b
Your output should resemble:
It may take a couple of minutes for the API key to be ready. Save the API key and secret. The secret is not retrievable later. +---------+------------------------------------------------------------------+ | API key | ABCXQHYDZXMMUDEF | | Secret | aBCde3s54+4Xv36YKPLDKy2aklGr6x/ShUrEX5D1Te4AzRlphFlr6eghmPX81HTF | +---------+------------------------------------------------------------------+
Important
Save the API key and secret. You require this information to configure your client applications. Be aware that this is the only time that you can access and view the API key and secret.
Customize your
/etc/ksqldb/ksql-server.properties
properties file.Tip
To use ksqlDB with Confluent Cloud, you must configure the ksqlDB server. The ksqlDB CLI does not require any additional configuration.
The following example shows the minimum configuration required to use ksqlDB with Confluent Cloud. You should also review the Recommended ksqlDB production settings. Replace
<api-key>
and<api-secret>
with the API key and secret that you generated previously.# For bootstrap.servers, assign the Endpoint value from the "confluent kafka cluster describe" command. # eg. pkc-4s087.us-west2.gcp.confluent.cloud:9092 bootstrap.servers=<broker-endpoint> ksql.internal.topic.replicas=3 ksql.streams.replication.factor=3 ksql.logging.processing.topic.replication.factor=3 listeners=http://0.0.0.0:8088 security.protocol=SASL_SSL sasl.mechanism=PLAIN # Replace <api-key> and <api-secret> with your API key and secret. sasl.jaas.config=\ org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" \ password="<api-secret>";
(Optional) Add configs for Confluent Cloud Schema Registry per the example in ksql-server-ccloud.delta on GitHub at ccloud/examples/template_delta_configs.
# Confluent Schema Registry configuration for ksqlDB Server ksql.schema.registry.basic.auth.credentials.source=USER_INFO ksql.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET> ksql.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
Restart the ksqlDB server. The steps to restart are dependent on your environment.
For more information, ksqlDB Configuration Parameter Reference.
Create ACLs for ksqlDB to access Confluent Cloud¶
If your Kafka cluster in Confluent Cloud has ACLs enabled, your ksqlDB application must be granted access to specific resources on the Kafka cluster. Use the following Confluent CLI commands to create the necessary ACLs in the Kafka cluster.
confluent kafka acl create --allow --service-account <id> --operations "DESCRIBE,DESCRIBE_CONFIGS" --cluster-scope
confluent kafka acl create --allow --service-account <id> --operations "WRITE,DESCRIBE" --transactional-id <ksqldb-service-id>
confluent kafka acl create --allow --service-account <id> --operations "CREATE,DESCRIBE,ALTER,DESCRIBE_CONFIGS,ALTER_CONFIGS,READ,WRITE,DELETE" --topic <ksqldb-service-id> --prefix
confluent kafka acl create --allow --service-account <id> --operations "CREATE,DESCRIBE,ALTER,DESCRIBE_CONFIGS,ALTER_CONFIGS,READ,WRITE,DELETE" --topic _confluent-ksql-<ksqldb-service-id> --prefix
confluent kafka acl create --allow --service-account <id> --operations "CREATE,DESCRIBE,ALTER,DESCRIBE_CONFIGS,ALTER_CONFIGS,READ,WRITE,DELETE" --consumer-group _confluent-ksql-<ksqldb-service-id> --prefix
Create ACLs for ksqlDB to access a specific topic in Confluent Cloud¶
In addition to assigning ACLs to bring up a ksqlDB application and communicate with Confluent Cloud, you need to specify ACLs that enable ksqlDB users to access specific topics.
In the following commands, replace <id>
with the service account ID that you
created previously.
Assign ACLs to SELECT FROM a stream or table¶
Run the following command to enable read access for SELECT FROM STREAM/TABLE statements on the stream or table’s underlying topics.
confluent kafka acl create --allow --service-account <id> --operations read --topic <topic>
Assign ACLs for writing to a topic¶
Run the following command to enable write access to a topic.
confluent kafka acl create --allow --service-account <id> --operations write --topic <topic>
Assign ACLs for creating a topic¶
When you write a ksqlDB statement that creates a Kafka topic, like CREATE STREAM, CREATE STREAM AS SELECT, CREATE TABLE, or CREATE TABLE AS SELECT, ksqlDB needs access to create the topic, in addition to access for reading or writing to the topic.
To grant CREATE and READ access for a CREATE STREAM
statement, like CREATE STREAM FOO (...) WITH (KAFKA_TOPIC='FOO', ...);
, run the following commands. The
commands for CREATE TABLE are similar.
confluent kafka acl create --allow --service-account <id> --operations create --topic 'FOO'
confluent kafka acl create --allow --service-account <id> --operations read --topic 'FOO'
confluent kafka acl create --allow --service-account <id> --operations create --cluster-scope
To grant CREATE and WRITE access for a CREATE STREAM AS SELECT statement, like CREATE STREAM BAR WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM FOO;
, run the following commands.
The commands for CREATE TABLE AS SELECT are similar.
confluent kafka acl create --allow --service-account <id> --operations create --topic 'BAR'
confluent kafka acl create --allow --service-account <id> --operations write --topic 'BAR'
confluent kafka acl create --allow --service-account <id> --operations create --cluster-scope
Assign ACLs for full access to all topics¶
Run the following command to enable full access to all topics.
confluent kafka acl create --allow --service-account <id> --operations read,write --topic '*'
Assign ACLs for full access to prefixed topics¶
Run the following command to enable full access to all topics with names that start with the specified prefix.
confluent kafka acl create --allow --service-account <id> --operations read,write --topic 'prefix' --prefix
Docker environment¶
You can run a mix of fully-managed services in Confluent Cloud and self-managed components running in Docker. For a Docker environment that connects any Confluent Platform component to Confluent Cloud, see cp-all-in-one-cloud.