Manage Pipelines With the CLI for Stream Designer on Confluent Cloud

You can use the Confluent CLI to create and manage Stream Designer pipelines. Use the following commands to perform operations on your pipelines.

Tip

You can use the Pipelines REST API to manage your pipelines. For more information, see Manage Pipelines With the REST API for Stream Designer on Confluent Cloud.

Prequisites for using CLI pipeline commands

To prepare for managing pipelines in Confluent Cloud, follow these steps.

  1. Request authorization from an OrganizationAdmin to create and modify pipelines in your Confluent Cloud environment.
  2. Run the confluent update command to get the latest version of the CLI.
  3. Log in to Confluent Cloud by using the confluent login command.
  4. Specify the Kafka cluster where the pipeline runs by using the confluent kafka cluster use command.
  5. Get an identifier for the ksqlDB cluster by using the confluent ksql cluster list command.

Get the Kafka cluster ID

You must specify the Kafka cluster that hosts your pipeline. Run the following commands to get the cluster ID and assign it as the active cluster.

  1. List the Kafka clusters in your environment.

    confluent kafka cluster list
    

    Your output should resemble:

           Id      |     Name     | Type  | Cloud    |  Region   | Availability | Status
    ---------------+--------------+-------+----------+-----------+--------------+---------
        lkc-9z8y7x | test-cluster | BASIC | gcp      | us-west4  | single-zone  | UP
    
  2. For convenience, assign the ID value to an environment variable. In this example, the ID is lkc-9z8y7x.

    CLUSTER_ID=<kafka-cluster-id>
    
  3. Specify the Kafka cluster to use for your pipeline.

    confluent kafka cluster use ${CLUSTER_ID}
    

    Your output should resemble:

    Set Kafka cluster "lkc-9z8y7x" as the active cluster for environment "env-e5d4c3".
    

Get the ksqlDB cluster ID

You need the ID for the ksqlDB cluster that runs your pipeline’s logic. Run the following command to get the ksqlDB cluster ID.

  1. List the ksqlDB clusters in your environment.

    confluent ksql cluster list
    

    Your output should resemble:

           ID       |        Name        | Topic Prefix |   Kafka Cluster    |
    ----------------+--------------------+--------------+--------------------+ ...
      lksqlc-3c2b1a | ksqlDB_cluster_0   | pksqlc-b0b03 | lkc-9z8y7x         |
    
  2. For convenience, assign the ID value to an environment variable. In this example, the ID is lksqlc-3c2b1a.

    KSQLDB_ID=<ksqldb-cluster-id>
    

Create a new pipeline

Run the confluent pipeline create command to create a new pipeline in the specified ksqlDB cluster. When the pipeline is created, it doesn’t have any configuration for logic or data sources and sinks. You specify these details in the Stream Designer UI in Confluent Cloud Console. Also, you can create or update a pipeline by specifying a source code file.

  1. Run the following command to create a pipeline in the ksqlDB cluster that has the specified ID. Ensure that the KSQLDB_ID environment variable is assigned with ID of your ksqlDB cluster, for example, lksqlc-3c2b1a.

    confluent pipeline create --name "test-pipeline" \
                              --description "first CLI pipeline" \
                              --ksql-cluster ${KSQLDB_ID}
    

    Your output should resemble:

    +----------------------+--------------------------------+
    | ID                   | pipe-1a2b3c                    |
    | Name                 | test-pipeline                  |
    | Description          | first CLI pipeline             |
    | KSQL Cluster         | lksqlc-3c2b1a                  |
    | Secret Names         | []                             |
    | Activation Privilege | false                          |
    | State                | draft                          |
    | Created At           | 2022-10-03 20:03:54.531015508  |
    |                      | +0000 UTC                      |
    | Updated At           | 2022-10-03 20:03:54.531015508  |
    |                      | +0000 UTC                      |
    +----------------------+--------------------------------+
    

    Note the pipeline ID, which you need for other CLI commands. For convenience, assign the ID value to an environment variable. In this example, the pipeline ID is pipe-1a2b3c.

    PIPE_ID=<pipeline-id>
    

    In the Confluent Cloud Console, you can view the pipeline that you created.

  2. Open Confluent Cloud Console, and in the navigation menu, click Stream Designer to open the Pipelines page.

    Stream Designer showing the pipelines list in Confluent Cloud Console
  3. Use Stream Designer to build your pipeline. For more information, see Quick Start for Stream Designer on Confluent Cloud.

    Tip

    You can use the Confluent CLI to update the pipeline with more SQL statements, and you can use the Pipelines REST API to manage the pipeline programmatically.

View existing pipelines

In Confluent CLI, run the confluent pipeline list command to view the pipelines that you’ve created.

confluent pipeline list

Your output should resemble:

      ID      |     Name      |    Description     | KSQL Cluster  | State
--------------+---------------+--------------------+---------------+--------
  pipe-1a2b3c | test-pipeline | first CLI pipeline | lksqlc-v7o935 | draft

Grant or revoke activation privileges to a pipeline

Before you can activate a pipeline, you must grant it activation privileges. Granting a pipeline privileges enables it to provision connectors, queries, topics, and other resources.

  • Grant privileges by setting the --activation-privilege option to true.
  • Revoke privileges by setting the --activation-privilege option to false.

Run the following command to grant activation privileges to a pipeline. Ensure that the PIPE_ID environment variable is assigned with the ID of your pipeline, for example, pipe-1a2b3c.

confluent pipeline update ${PIPE_ID} \
  --activation-privilege=true

Your output should resemble:

+----------------------+--------------------------------+
| ID                   | pipe-1a2b3c                    |
| Name                 | test-pipeline                  |
| Description          | first CLI pipeline             |
| KSQL Cluster         | lksqlc-3c2b1a                  |
| Secret Names         | []                             |
| Activation Privilege | true                           |
| State                | draft                          |
| Created At           | 2022-10-03 20:03:54.531015508  |
|                      | +0000 UTC                      |
| Updated At           | 2022-10-03 20:03:54.531015508  |
|                      | +0000 UTC                      |
+----------------------+--------------------------------+

Activate a pipeline

When your pipeline is ready to activate, use the confluent pipeline activate command to deploy it.

Activation means that topics are created, connectors are deployed, and ksqlDB persistent queries are created.

Note

You must grant activation privileges to the pipeline before you can activate it.

  1. Run the following command to activate a pipeline. Ensure that the PIPE_ID environment variable is assigned with the ID of your pipeline, for example, pipe-1a2b3c.

    confluent pipeline activate ${PIPE_ID}
    

    Your output should resemble:

    +----------------------+--------------------------------+
    | ID                   | pipe-1a2b3c                    |
    | Name                 | test-pipeline                  |
    | Description          | first CLI pipeline             |
    | KSQL Cluster         | lksqlc-3c2b1a                  |
    | Secret Names         | []                             |
    | Activation Privilege | true                           |
    | State                | activating                     |
    | Created At           | 2022-10-03 20:03:54.531015508  |
    |                      | +0000 UTC                      |
    | Updated At           | 2022-10-03 20:33:29.961081381  |
    |                      | +0000 UTC                      |
    +----------------------+--------------------------------+
    
  2. After a few minutes, run the confluent pipeline list command to check on your pipeline’s activation status. If there were no activation errors, the pipeline state is active:

          ID      |     Name      |    Description     | KSQL Cluster  | State
    --------------+---------------+--------------------+---------------+---------
      pipe-1a2b3c | test-pipeline | first CLI pipeline | lksqlc-3c2b1a | active
    

Update an existing pipeline description

Use the confluent pipeline update command to update the name or description of a pipeline. You can update the name and description separately or in the same command.

  1. Run the following command to update a pipeline’s name and description. Ensure that the PIPE_ID environment variable is assigned with the ID of your pipeline, for example, pipe-1a2b3c.

    confluent pipeline update ${PIPE_ID} \
      --name "<name>" \
      --description "<pipeline-description>"
    

    Your output should resemble:

    +----------------------+--------------------------------+
    | ID                   | pipe-1a2b3c                    |
    | Name                 | test-pipeline-2                |
    | Description          | Renamed pipeline               |
    | KSQL Cluster         | lksqlc-3c2b1a                  |
    | Secret Names         | []                             |
    | Activation Privilege | true                           |
    | State                | active                         |
    | Created At           | 2022-10-03 20:03:54.531015508  |
    |                      | +0000 UTC                      |
    | Updated At           | 2022-10-03 21:02:31.806605354  |
    |                      | +0000 UTC                      |
    +----------------------+--------------------------------+
    
  2. Run the confluent pipeline list command to confirm your changes.

    Your output should resemble:

          ID      |      Name       |   Description    | KSQL Cluster  | State
    --------------+-----------------+------------------+---------------+---------
      pipe-1a2b3c | test-pipeline-2 | Renamed pipeline | lksqlc-3c2b1a | active
    

Describe an existing pipeline

Use the confluent pipeline describe command to view details of a pipeline.

Run the following command to describe a pipeline. Ensure that the PIPE_ID environment variable is assigned with the ID of your pipeline, for example, pipe-1a2b3c.

confluent pipeline describe ${PIPE_ID}

Your output should resemble:

+----------------------+--------------------------------+
| ID                   | pipe-1a2b3c                    |
| Name                 | test-pipeline-2                |
| Description          | Renamed pipeline               |
| KSQL Cluster         | lksqlc-3c2b1a                  |
| Secret Names         | []                             |
| Activation Privilege | true                           |
| State                | active                         |
| Created At           | 2022-10-03 20:03:54.531015508  |
|                      | +0000 UTC                      |
| Updated At           | 2022-10-03 21:02:31.806605354  |
|                      | +0000 UTC                      |
+----------------------+--------------------------------+

Save an existing pipeline to a file

Use the confluent pipeline save command to save the details of a pipeline.

Run the following command to save a pipeline. Ensure that the PIPE_ID environment variable is assigned with the ID of your pipeline, for example, pipe-1a2b3c.

confluent pipeline save ${PIPE_ID}

Your output should resemble:

Saved source code for pipeline "pipe-1a2b3c" at "pipe-1a2b3c.sql".

Use the --sql-file option to specify the filename for the saved pipeline.

Note

If the SQL file is empty, update the pipeline, and Stream Designer will start returning the SQL code.

Deactivate a running pipeline

Use the confluent pipeline deactivate command to deactivate a running pipeline. Specify the topics to be retained when the pipeline deactivates by using the --retained-topics flag.

Note

All imported topics must be marked as retained before deactivating. Specify them in the --retained-topics list.

  1. Run the following command to deactivate a pipeline. Ensure that the PIPE_ID environment variable is assigned with the ID of your pipeline, for example, pipe-1a2b3c.

    confluent pipeline deactivate ${PIPE_ID} \
      --retained-topics <topic-1>,<topic-2>,…
    

    Your output should resemble:

    +----------------------+-------------------------------+
    | ID                   | pipe-1a2b3c                   |
    | Name                 | test-pipeline-2               |
    | Description          | Renamed pipeline              |
    | KSQL Cluster         | lksqlc-3c2b1a                 |
    | Secret Names         | []                            |
    | Activation Privilege | true                          |
    | State                | deactivating                  |
    | Created At           | 2023-01-05 20:20:01.326947814 |
    |                      | +0000 UTC                     |
    | Updated At           | 2023-01-13 19:59:11.244569085 |
    |                      | +0000 UTC                     |
    +----------------------+-------------------------------+
    
  2. Run the confluent pipeline list command to confirm the pipeline is deactivated and back in the draft state.

    Your output should resemble:

          ID      |      Name       |   Description    | KSQL Cluster  | State
    --------------+-----------------+------------------+---------------+---------
      pipe-1a2b3c | test-pipeline-2 | Renamed pipeline | lksqlc-3c2b1a | draft
    

Delete a pipeline

Use the confluent pipeline delete command to remove a pipeline from Confluent Cloud.

Note

Deactivate the pipeline before deletion. If you don’t deactivate first, Confluent Cloud resources, like connectors and topics, are not deleted.

  1. Run the following command to delete a pipeline. Ensure that the PIPE_ID environment variable is assigned with the ID of your pipeline, for example, pipe-1a2b3c.

    confluent pipeline delete ${PIPE_ID}
    

    Your output should resemble:

    Requested to delete pipeline "pipe-1a2b3c".
    
  2. Run the confluent pipeline describe command to confirm the pipeline is deleted.

    confluent pipeline describe ${PIPE_ID}
    

    Your output should resemble:

     +----------------------+-------------------------------+
     | ID                   | pipe-1a2b3c                   |
     | Name                 | test-pipeline-2               |
     | Description          | Renamed pipeline              |
     | KSQL Cluster         | lksqlc-3c2b1a                 |
     | Secret Names         | []                            |
     | Activation Privilege | true                          |
     | State                | deleted                       |
     | Created At           | 2023-01-13 20:15:36.229050956 |
     |                      | +0000 UTC                     |
     | Updated At           | 2023-01-13 20:17:37.719595424 |
     |                      | +0000 UTC                     |
     +----------------------+-------------------------------+
    

Create or update a pipeline from source code

You can use the create and update commands to define a pipeline from a specified source code file.

If your SQL declares connectors, you need to specify a Kafka cluster API key and secret to activate them. Also, many connectors require secrets for authentication, and Stream Designer provides syntax for saving secrets in the credential store. For more information, see Manage Pipeline Secrets for Stream Designer on Confluent Cloud.

The following example shows how to create a simple pipeline that has a Datagen source connector and a corresponding topic. It shows how to pass a Kafka cluster API key and secret to Stream Designer.

Create a pipeline from a SQL file

The following example command shows how to create a new pipeline from a source file named pipeline-with-secret.sql.

  1. Copy the following SQL into a file named pipeline-with-secret.sql. The code declares a pipeline that has a Datagen source connector, a Kafka topic named pageviews, and a stream named AllPageviews.

    The variables for the connector’s Kafka API key and secret, named PAGEVIEWS_SOURCE_KAFKA_API_KEY and PAGEVIEWS_SOURCE_KAFKA_API_SECRET are assigned later by the confluent pipeline create command.

    CREATE SOURCE CONNECTOR "pageviews-source" WITH (
      "connector.class"='DatagenSource',
      "kafka.api.key"='${PAGEVIEWS_SOURCE_KAFKA_API_KEY}',
      "kafka.api.secret"='${PAGEVIEWS_SOURCE_KAFKA_API_SECRET}',
      "kafka.auth.mode"='KAFKA_API_KEY',
      "kafka.topic"='pageviews',
      "output.data.format"='JSON_SR',
      "quickstart"='PAGEVIEWS',
      "tasks.max"='1'
    );
    
    CREATE OR REPLACE STREAM "AllPageviews" (PAGEID VARCHAR, USERID VARCHAR, VIEWTIME BIGINT)
      WITH (kafka_topic='pageviews', partitions=1, key_format='KAFKA', value_format='JSON_SR');
    
  2. Save the file and run the following command to create a new pipeline. Set the KAFKA_API_KEY and KAFKA_API_SECRET environment variables to a valid Kafka cluster API key and secret using your choice of techniques, including a secrets management tool like Vault or file encryption tools like sops.

    Important

    You can use string literals for secrets instead of environment variables, but for security reasons, never use the command with literals in any deployment processes.

    confluent pipeline create \
      --ksql-cluster ${KSQLDB_ID} \
      --name pipeline-with-secret \
      --description "pipeline with secret" \
      --secret PAGEVIEWS_SOURCE_KAFKA_API_KEY=${KAFKA_API_KEY} \
      --secret PAGEVIEWS_SOURCE_KAFKA_API_SECRET=${KAFKA_API_SECRET} \
      --sql-file pipeline-with-secret.sql
    

    Your output should resemble:

    +----------------------+------------------------------------+
    | ID                   | pipe-6f7g8h                        |
    | Name                 | pipeline-with-secret               |
    | Description          | pipeline with secret               |
    | KSQL Cluster         | lksqlc-3c2b1a                      |
    | Secret Names         | [PAGEVIEWS_SOURCE_KAFKA_API_KEY    |
    |                      | PAGEVIEWS_SOURCE_KAFKA_API_SECRET] |
    | Activation Privilege | false                              |
    | State                | draft                              |
    | Created At           | 2023-01-18 23:04:12.202783937      |
    |                      | +0000 UTC                          |
    | Updated At           | 2023-01-18 23:04:12.36618846       |
    |                      | +0000 UTC                          |
    +----------------------+------------------------------------+
    

    Note the pipeline ID, which you need for other CLI commands. For convenience, assign the ID value to an environment variable. In this example, the pipeline ID is pipe-6f7g8h.

    PIPE_ID=<pipeline-id>
    

Update a pipeline from a SQL file

Use the update command to update a pipeline with a specified source code file.

  1. Edit the the pipeline-with-secret.sql file and append the following SQL. The code adds a Datagen source connector for users data, a Kafka topic named users, and a stream named AllUsers.

    The variables for the connector’s Kafka API key and secret, named USERS_SOURCE_KAFKA_API_KEY and USERS_SOURCE_KAFKA_API_SECRET are assigned later by the confluent pipeline update command.

    CREATE SOURCE CONNECTOR "users-source" WITH (
      "connector.class"='DatagenSource',
      "kafka.api.key"='${USERS_SOURCE_KAFKA_API_KEY}',
      "kafka.api.secret"='${USERS_SOURCE_KAFKA_API_SECRET}',
      "kafka.auth.mode"='KAFKA_API_KEY',
      "kafka.topic"='users',
      "output.data.format"='JSON_SR',
      "quickstart"='USERS',
      "tasks.max"='1'
    );
    
    CREATE OR REPLACE TABLE "AllUsers" (GENDER STRING, ID STRING PRIMARY KEY, REGIONID STRING, REGISTERTIME BIGINT, USERID STRING)
      WITH (kafka_topic='users', partitions=1, key_format='KAFKA', value_format='JSON_SR');
    
  2. Save the file and run the following command to update the pipeline from the modified pipeline-with-secret.sql source file. You can use the same Kafka cluster API key and secret you used for the pageviews-source connector.

    Note

    The --secret parameter is only required when a secret needs to be added or changed.

    confluent pipeline update ${PIPE_ID} \
      --description "updated pipeline with secrets" \
      --secret USERS_SOURCE_KAFKA_API_KEY=${KAFKA_API_KEY} \
      --secret USERS_SOURCE_KAFKA_API_SECRET=${KAFKA_API_SECRET} \
      --sql-file pipeline-with-secret.sql
    

    Your output should resemble:

    +----------------------+--------------------------------+
    | ID                   | pipe-6f7g8h                    |
    | Name                 | pipeline-with-secret           |
    | Description          | updated pipeline with secrets  |
    | KSQL Cluster         | lksqlc-3c2b1a                  |
    | Secret Names         | [USERS_SOURCE_KAFKA_API_KEY    |
    |                      | USERS_SOURCE_KAFKA_API_SECRET] |
    | Activation Privilege | false                          |
    | State                | draft                          |
    | Created At           | 2023-01-18 23:04:12.202783937  |
    |                      | +0000 UTC                      |
    | Updated At           | 2023-01-18 23:04:13.3617648    |
    |                      | +0000 UTC                      |
    +----------------------+--------------------------------+
    
  3. Run the confluent pipeline describe command to confirm the pipeline is updated with the new secret.

    confluent pipeline describe ${PIPE_ID}
    

    Your output should resemble:

    +----------------------+-----------------------------------+
    | ID                   | pipe-6f7g8h                       |
    | Name                 | pipeline-with-secret              |
    | Description          | updated pipeline with secrets     |
    | KSQL Cluster         | lksqlc-3c2b1a                     |
    | Secret Names         | [PAGEVIEWS_SOURCE_KAFKA_API_KEY   |
    |                      | PAGEVIEWS_SOURCE_KAFKA_API_SECRET |
    |                      | USERS_SOURCE_KAFKA_API_KEY        |
    |                      | USERS_SOURCE_KAFKA_API_SECRET]    |
    | Activation Privilege | false                             |
    | State                | draft                             |
    | Created At           | 2023-01-18 23:04:12.202783937     |
    |                      | +0000 UTC                         |
    | Updated At           | 2023-01-18 23:36:14.11857322      |
    |                      | +0000 UTC                         |
    +----------------------+-----------------------------------+