Manage Pipelines With the CLI for Stream Designer on Confluent Cloud¶
You can use the Confluent CLI to create and manage Stream Designer pipelines. Use the following commands to perform operations on your pipelines.
- Prequisites for using CLI pipeline commands
- Create a new pipeline
- View existing pipelines
- Grant or revoke activation privileges to a pipeline
- Activate a pipeline
- Update an existing pipeline description
- Describe an existing pipeline
- Save an existing pipeline to a file
- Deactivate a running pipeline
- Delete a pipeline
- Create or update a pipeline from source code
Tip
You can use the Pipelines REST API to manage your pipelines. For more information, see Manage Pipelines With the REST API for Stream Designer on Confluent Cloud.
Prequisites for using CLI pipeline commands¶
To prepare for managing pipelines in Confluent Cloud, follow these steps.
- Request authorization from an
OrganizationAdmin
to create and modify pipelines in your Confluent Cloud environment. - Run the
confluent update
command to get the latest version of the CLI. - Log in to Confluent Cloud by using the
confluent login
command. - Specify the Kafka cluster where the pipeline runs by using the
confluent kafka cluster use
command. - Get an identifier for the ksqlDB cluster by using the
confluent ksql cluster list
command.
Get the Kafka cluster ID¶
You must specify the Kafka cluster that hosts your pipeline. Run the following commands to get the cluster ID and assign it as the active cluster.
List the Kafka clusters in your environment.
confluent kafka cluster list
Your output should resemble:
Id | Name | Type | Cloud | Region | Availability | Status ---------------+--------------+-------+----------+-----------+--------------+--------- lkc-9z8y7x | test-cluster | BASIC | gcp | us-west4 | single-zone | UP
For convenience, assign the ID value to an environment variable. In this example, the ID is
lkc-9z8y7x
.CLUSTER_ID=<kafka-cluster-id>
Specify the Kafka cluster to use for your pipeline.
confluent kafka cluster use ${CLUSTER_ID}
Your output should resemble:
Set Kafka cluster "lkc-9z8y7x" as the active cluster for environment "env-e5d4c3".
Get the ksqlDB cluster ID¶
You need the ID for the ksqlDB cluster that runs your pipeline’s logic. Run the following command to get the ksqlDB cluster ID.
List the ksqlDB clusters in your environment.
confluent ksql cluster list
Your output should resemble:
ID | Name | Topic Prefix | Kafka Cluster | ----------------+--------------------+--------------+--------------------+ ... lksqlc-3c2b1a | ksqlDB_cluster_0 | pksqlc-b0b03 | lkc-9z8y7x |
For convenience, assign the ID value to an environment variable. In this example, the ID is
lksqlc-3c2b1a
.KSQLDB_ID=<ksqldb-cluster-id>
Create a new pipeline¶
Run the confluent pipeline create command to create a new pipeline in the specified ksqlDB cluster. When the pipeline is created, it doesn’t have any configuration for logic or data sources and sinks. You specify these details in the Stream Designer UI in Confluent Cloud Console. Also, you can create or update a pipeline by specifying a source code file.
Run the following command to create a pipeline in the ksqlDB cluster that has the specified ID. Ensure that the
KSQLDB_ID
environment variable is assigned with ID of your ksqlDB cluster, for example,lksqlc-3c2b1a
.confluent pipeline create --name "test-pipeline" \ --description "first CLI pipeline" \ --ksql-cluster ${KSQLDB_ID}
Your output should resemble:
+----------------------+--------------------------------+ | ID | pipe-1a2b3c | | Name | test-pipeline | | Description | first CLI pipeline | | KSQL Cluster | lksqlc-3c2b1a | | Secret Names | [] | | Activation Privilege | false | | State | draft | | Created At | 2022-10-03 20:03:54.531015508 | | | +0000 UTC | | Updated At | 2022-10-03 20:03:54.531015508 | | | +0000 UTC | +----------------------+--------------------------------+
Note the pipeline ID, which you need for other CLI commands. For convenience, assign the ID value to an environment variable. In this example, the pipeline ID is
pipe-1a2b3c
.PIPE_ID=<pipeline-id>
In the Confluent Cloud Console, you can view the pipeline that you created.
Open Confluent Cloud Console, and in the navigation menu, click Stream Designer to open the Pipelines page.
Use Stream Designer to build your pipeline. For more information, see Quick Start for Stream Designer on Confluent Cloud.
Tip
You can use the Confluent CLI to update the pipeline with more SQL statements, and you can use the Pipelines REST API to manage the pipeline programmatically.
View existing pipelines¶
In Confluent CLI, run the confluent pipeline list command to view the pipelines that you’ve created.
confluent pipeline list
Your output should resemble:
ID | Name | Description | KSQL Cluster | State
--------------+---------------+--------------------+---------------+--------
pipe-1a2b3c | test-pipeline | first CLI pipeline | lksqlc-v7o935 | draft
Grant or revoke activation privileges to a pipeline¶
Before you can activate a pipeline, you must grant it activation privileges. Granting a pipeline privileges enables it to provision connectors, queries, topics, and other resources.
- Grant privileges by setting the
--activation-privilege
option totrue
. - Revoke privileges by setting the
--activation-privilege
option tofalse
.
Run the following command to grant activation privileges to a pipeline.
Ensure that the PIPE_ID
environment variable is assigned with the ID
of your pipeline, for example, pipe-1a2b3c
.
confluent pipeline update ${PIPE_ID} \
--activation-privilege=true
Your output should resemble:
+----------------------+--------------------------------+
| ID | pipe-1a2b3c |
| Name | test-pipeline |
| Description | first CLI pipeline |
| KSQL Cluster | lksqlc-3c2b1a |
| Secret Names | [] |
| Activation Privilege | true |
| State | draft |
| Created At | 2022-10-03 20:03:54.531015508 |
| | +0000 UTC |
| Updated At | 2022-10-03 20:03:54.531015508 |
| | +0000 UTC |
+----------------------+--------------------------------+
Activate a pipeline¶
When your pipeline is ready to activate, use the confluent pipeline activate command to deploy it.
Activation means that topics are created, connectors are deployed, and ksqlDB persistent queries are created.
Note
You must grant activation privileges to the pipeline before you can activate it.
Run the following command to activate a pipeline. Ensure that the
PIPE_ID
environment variable is assigned with the ID of your pipeline, for example,pipe-1a2b3c
.confluent pipeline activate ${PIPE_ID}
Your output should resemble:
+----------------------+--------------------------------+ | ID | pipe-1a2b3c | | Name | test-pipeline | | Description | first CLI pipeline | | KSQL Cluster | lksqlc-3c2b1a | | Secret Names | [] | | Activation Privilege | true | | State | activating | | Created At | 2022-10-03 20:03:54.531015508 | | | +0000 UTC | | Updated At | 2022-10-03 20:33:29.961081381 | | | +0000 UTC | +----------------------+--------------------------------+
After a few minutes, run the
confluent pipeline list
command to check on your pipeline’s activation status. If there were no activation errors, the pipeline state isactive
:ID | Name | Description | KSQL Cluster | State --------------+---------------+--------------------+---------------+--------- pipe-1a2b3c | test-pipeline | first CLI pipeline | lksqlc-3c2b1a | active
Update an existing pipeline description¶
Use the confluent pipeline update command to update the name or description of a pipeline. You can update the name and description separately or in the same command.
Run the following command to update a pipeline’s name and description. Ensure that the
PIPE_ID
environment variable is assigned with the ID of your pipeline, for example,pipe-1a2b3c
.confluent pipeline update ${PIPE_ID} \ --name "<name>" \ --description "<pipeline-description>"
Your output should resemble:
+----------------------+--------------------------------+ | ID | pipe-1a2b3c | | Name | test-pipeline-2 | | Description | Renamed pipeline | | KSQL Cluster | lksqlc-3c2b1a | | Secret Names | [] | | Activation Privilege | true | | State | active | | Created At | 2022-10-03 20:03:54.531015508 | | | +0000 UTC | | Updated At | 2022-10-03 21:02:31.806605354 | | | +0000 UTC | +----------------------+--------------------------------+
Run the
confluent pipeline list
command to confirm your changes.Your output should resemble:
ID | Name | Description | KSQL Cluster | State --------------+-----------------+------------------+---------------+--------- pipe-1a2b3c | test-pipeline-2 | Renamed pipeline | lksqlc-3c2b1a | active
Describe an existing pipeline¶
Use the confluent pipeline describe command to view details of a pipeline.
Run the following command to describe a pipeline. Ensure that the PIPE_ID
environment variable is assigned with the ID of your pipeline, for example,
pipe-1a2b3c
.
confluent pipeline describe ${PIPE_ID}
Your output should resemble:
+----------------------+--------------------------------+
| ID | pipe-1a2b3c |
| Name | test-pipeline-2 |
| Description | Renamed pipeline |
| KSQL Cluster | lksqlc-3c2b1a |
| Secret Names | [] |
| Activation Privilege | true |
| State | active |
| Created At | 2022-10-03 20:03:54.531015508 |
| | +0000 UTC |
| Updated At | 2022-10-03 21:02:31.806605354 |
| | +0000 UTC |
+----------------------+--------------------------------+
Save an existing pipeline to a file¶
Use the confluent pipeline save command to save the details of a pipeline.
Run the following command to save a pipeline. Ensure that the PIPE_ID
environment variable is assigned with the ID of your pipeline, for example,
pipe-1a2b3c
.
confluent pipeline save ${PIPE_ID}
Your output should resemble:
Saved source code for pipeline "pipe-1a2b3c" at "pipe-1a2b3c.sql".
Use the --sql-file
option to specify the filename for the saved pipeline.
Note
If the SQL file is empty, update the pipeline, and Stream Designer will start returning the SQL code.
Deactivate a running pipeline¶
Use the
confluent pipeline deactivate
command to deactivate a running pipeline. Specify the topics to be retained
when the pipeline deactivates by using the --retained-topics
flag.
Note
All imported topics must be marked as retained before deactivating. Specify
them in the --retained-topics
list.
Run the following command to deactivate a pipeline. Ensure that the
PIPE_ID
environment variable is assigned with the ID of your pipeline, for example,pipe-1a2b3c
.confluent pipeline deactivate ${PIPE_ID} \ --retained-topics <topic-1>,<topic-2>,…
Your output should resemble:
+----------------------+-------------------------------+ | ID | pipe-1a2b3c | | Name | test-pipeline-2 | | Description | Renamed pipeline | | KSQL Cluster | lksqlc-3c2b1a | | Secret Names | [] | | Activation Privilege | true | | State | deactivating | | Created At | 2023-01-05 20:20:01.326947814 | | | +0000 UTC | | Updated At | 2023-01-13 19:59:11.244569085 | | | +0000 UTC | +----------------------+-------------------------------+
Run the
confluent pipeline list
command to confirm the pipeline is deactivated and back in thedraft
state.Your output should resemble:
ID | Name | Description | KSQL Cluster | State --------------+-----------------+------------------+---------------+--------- pipe-1a2b3c | test-pipeline-2 | Renamed pipeline | lksqlc-3c2b1a | draft
Delete a pipeline¶
Use the confluent pipeline delete command to remove a pipeline from Confluent Cloud.
Note
Deactivate the pipeline before deletion. If you don’t deactivate first, Confluent Cloud resources, like connectors and topics, are not deleted.
Run the following command to delete a pipeline. Ensure that the
PIPE_ID
environment variable is assigned with the ID of your pipeline, for example,pipe-1a2b3c
.confluent pipeline delete ${PIPE_ID}
Your output should resemble:
Requested to delete pipeline "pipe-1a2b3c".
Run the
confluent pipeline describe
command to confirm the pipeline is deleted.confluent pipeline describe ${PIPE_ID}
Your output should resemble:
+----------------------+-------------------------------+ | ID | pipe-1a2b3c | | Name | test-pipeline-2 | | Description | Renamed pipeline | | KSQL Cluster | lksqlc-3c2b1a | | Secret Names | [] | | Activation Privilege | true | | State | deleted | | Created At | 2023-01-13 20:15:36.229050956 | | | +0000 UTC | | Updated At | 2023-01-13 20:17:37.719595424 | | | +0000 UTC | +----------------------+-------------------------------+
Create or update a pipeline from source code¶
You can use the create and update commands to define a pipeline from a specified source code file.
If your SQL declares connectors, you need to specify a Kafka cluster API key and secret to activate them. Also, many connectors require secrets for authentication, and Stream Designer provides syntax for saving secrets in the credential store. For more information, see Manage Pipeline Secrets for Stream Designer on Confluent Cloud.
The following example shows how to create a simple pipeline that has a Datagen source connector and a corresponding topic. It shows how to pass a Kafka cluster API key and secret to Stream Designer.
Create a pipeline from a SQL file¶
The following example command shows how to create a new pipeline from a source
file named pipeline-with-secret.sql
.
Copy the following SQL into a file named
pipeline-with-secret.sql
. The code declares a pipeline that has a Datagen source connector, a Kafka topic namedpageviews
, and a stream namedAllPageviews
.The variables for the connector’s Kafka API key and secret, named
PAGEVIEWS_SOURCE_KAFKA_API_KEY
andPAGEVIEWS_SOURCE_KAFKA_API_SECRET
are assigned later by theconfluent pipeline create
command.CREATE SOURCE CONNECTOR "pageviews-source" WITH ( "connector.class"='DatagenSource', "kafka.api.key"='${PAGEVIEWS_SOURCE_KAFKA_API_KEY}', "kafka.api.secret"='${PAGEVIEWS_SOURCE_KAFKA_API_SECRET}', "kafka.auth.mode"='KAFKA_API_KEY', "kafka.topic"='pageviews', "output.data.format"='JSON_SR', "quickstart"='PAGEVIEWS', "tasks.max"='1' ); CREATE OR REPLACE STREAM "AllPageviews" (PAGEID VARCHAR, USERID VARCHAR, VIEWTIME BIGINT) WITH (kafka_topic='pageviews', partitions=1, key_format='KAFKA', value_format='JSON_SR');
Save the file and run the following command to create a new pipeline. Set the
KAFKA_API_KEY
andKAFKA_API_SECRET
environment variables to a valid Kafka cluster API key and secret using your choice of techniques, including a secrets management tool like Vault or file encryption tools like sops.Important
You can use string literals for secrets instead of environment variables, but for security reasons, never use the command with literals in any deployment processes.
confluent pipeline create \ --ksql-cluster ${KSQLDB_ID} \ --name pipeline-with-secret \ --description "pipeline with secret" \ --secret PAGEVIEWS_SOURCE_KAFKA_API_KEY=${KAFKA_API_KEY} \ --secret PAGEVIEWS_SOURCE_KAFKA_API_SECRET=${KAFKA_API_SECRET} \ --sql-file pipeline-with-secret.sql
Your output should resemble:
+----------------------+------------------------------------+ | ID | pipe-6f7g8h | | Name | pipeline-with-secret | | Description | pipeline with secret | | KSQL Cluster | lksqlc-3c2b1a | | Secret Names | [PAGEVIEWS_SOURCE_KAFKA_API_KEY | | | PAGEVIEWS_SOURCE_KAFKA_API_SECRET] | | Activation Privilege | false | | State | draft | | Created At | 2023-01-18 23:04:12.202783937 | | | +0000 UTC | | Updated At | 2023-01-18 23:04:12.36618846 | | | +0000 UTC | +----------------------+------------------------------------+
Note the pipeline ID, which you need for other CLI commands. For convenience, assign the ID value to an environment variable. In this example, the pipeline ID is
pipe-6f7g8h
.PIPE_ID=<pipeline-id>
Update a pipeline from a SQL file¶
Use the update command to update a pipeline with a specified source code file.
Edit the the
pipeline-with-secret.sql
file and append the following SQL. The code adds a Datagen source connector for users data, a Kafka topic namedusers
, and a stream namedAllUsers
.The variables for the connector’s Kafka API key and secret, named
USERS_SOURCE_KAFKA_API_KEY
andUSERS_SOURCE_KAFKA_API_SECRET
are assigned later by theconfluent pipeline update
command.CREATE SOURCE CONNECTOR "users-source" WITH ( "connector.class"='DatagenSource', "kafka.api.key"='${USERS_SOURCE_KAFKA_API_KEY}', "kafka.api.secret"='${USERS_SOURCE_KAFKA_API_SECRET}', "kafka.auth.mode"='KAFKA_API_KEY', "kafka.topic"='users', "output.data.format"='JSON_SR', "quickstart"='USERS', "tasks.max"='1' ); CREATE OR REPLACE TABLE "AllUsers" (GENDER STRING, ID STRING PRIMARY KEY, REGIONID STRING, REGISTERTIME BIGINT, USERID STRING) WITH (kafka_topic='users', partitions=1, key_format='KAFKA', value_format='JSON_SR');
Save the file and run the following command to update the pipeline from the modified
pipeline-with-secret.sql
source file. You can use the same Kafka cluster API key and secret you used for thepageviews-source
connector.Note
The
--secret
parameter is only required when a secret needs to be added or changed.confluent pipeline update ${PIPE_ID} \ --description "updated pipeline with secrets" \ --secret USERS_SOURCE_KAFKA_API_KEY=${KAFKA_API_KEY} \ --secret USERS_SOURCE_KAFKA_API_SECRET=${KAFKA_API_SECRET} \ --sql-file pipeline-with-secret.sql
Your output should resemble:
+----------------------+--------------------------------+ | ID | pipe-6f7g8h | | Name | pipeline-with-secret | | Description | updated pipeline with secrets | | KSQL Cluster | lksqlc-3c2b1a | | Secret Names | [USERS_SOURCE_KAFKA_API_KEY | | | USERS_SOURCE_KAFKA_API_SECRET] | | Activation Privilege | false | | State | draft | | Created At | 2023-01-18 23:04:12.202783937 | | | +0000 UTC | | Updated At | 2023-01-18 23:04:13.3617648 | | | +0000 UTC | +----------------------+--------------------------------+
Run the
confluent pipeline describe
command to confirm the pipeline is updated with the new secret.confluent pipeline describe ${PIPE_ID}
Your output should resemble:
+----------------------+-----------------------------------+ | ID | pipe-6f7g8h | | Name | pipeline-with-secret | | Description | updated pipeline with secrets | | KSQL Cluster | lksqlc-3c2b1a | | Secret Names | [PAGEVIEWS_SOURCE_KAFKA_API_KEY | | | PAGEVIEWS_SOURCE_KAFKA_API_SECRET | | | USERS_SOURCE_KAFKA_API_KEY | | | USERS_SOURCE_KAFKA_API_SECRET] | | Activation Privilege | false | | State | draft | | Created At | 2023-01-18 23:04:12.202783937 | | | +0000 UTC | | Updated At | 2023-01-18 23:36:14.11857322 | | | +0000 UTC | +----------------------+-----------------------------------+