Manage Pipelines With the REST API for Stream Designer on Confluent Cloud

You can use the Confluent Cloud REST API to create and manage Stream Designer pipelines. Use the following commands to perform operations on your pipelines.

For the REST API reference, see Pipelines REST API.

Tip

You can use the Confluent CLI to manage your pipelines. For more information, see Manage Pipelines With the CLI for Stream Designer on Confluent Cloud.

Prequisites for using the Pipelines REST API

To prepare for managing pipelines in Confluent Cloud, follow these steps.

  1. Request authorization from an OrganizationAdmin to create and modify pipelines in your Confluent Cloud environment.
  2. To run the following examples, install the curl and jq commands.
  3. Run the confluent update command to get the latest version of the Confluent CLI.

Get an authentication token

You can use a Confluent CLI authentication and authorization token with the curl command. First, log into Confluent Cloud as above, then get the current context:

  1. Log in to Confluent Cloud by using the confluent login command.

  2. Run the following command to get the current context.

    cat ~/.confluent/config.json | jq -r .current_context
    

    Your output should resemble the following:

    login-<email>@confluent.io-https://confluent.cloud/
    
  3. Store the token in a local environment variable. The quotation marks are required.

    TOKEN=$(cat ~/.confluent/config.json | jq -r '.context_states."<from-above>".auth_token')
    

    The following example shows how your token assignment should appear.

    TOKEN=$(cat ~/.confluent/config.json | jq -r '.context_states."login-joe.cool@example.com-https://confluent.cloud/".auth_token')
    
  4. Inspect the token.

    echo $TOKEN
    

    Your output should resemble the following:

    eyJhbB0BciOiJSUzID0BBsNiIsS1ackImprd…
    

    Note

    After some time, your token will expire. To continue using the REST API, log in again, and run the previous commands to store the latest token.

Get the environment and Kafka cluster IDs

Calls to the REST API require the identifiers for your environment and Kafka cluster.

  1. Run the following command to list the Kafka clusters in your environment.

    confluent kafka cluster list
    

    Your output should resemble:

           Id      |     Name     | Type  | Provider |  Region   | Availability | Status
    ---------------+--------------+-------+----------+-----------+--------------+---------
        lkc-9z8y7x | test-cluster | BASIC | gcp      | us-west4  | single-zone  | UP
    
  2. Run the following command to list your environments.

    confluent env list
    

    Your output should resemble:

           ID      |      Name
    ---------------+------------------
      * env-abc123 | default
        env-xyz123 | test_env
        env-wyz345 | dev
    
  3. Note the Kafka cluster ID and the environment ID. In the current example, the Kafka cluster ID is lkc-9z8y7x, and the environment ID is env-abc123.

  4. For convenience, assign the ID values to environment variables.

    CLUSTER_ID=<kafka-cluster-id> && \
    ENV_ID=<env-id>
    

Create a pipeline

The following steps show how to create a simple pipeline from a pipeline description file, which is a JSON file that specifies the environment, metadata, and code for your pipeline.

To create a pipeline, you need the identifiers of your ksqlDB and Stream Governance clusters, in addition to the environment and Kafka cluster IDs.

  1. Run the following command to get the ID of your Stream Governance cluster.

    confluent schema-registry cluster describe
    

    Your output should resemble:

     +-------------------------+----------------------------------------------------+
     | Name                    | Stream Governance Package                          |
     | Cluster ID              | lsrc-0m2q5                                         |
     | Endpoint URL            | https://psrc-a1b2c.us-central1.gcp.confluent.cloud |
     | Used Schemas            |                                                 21 |
     | Available Schemas       |                                              19979 |
     | Free Schemas Limit      |                                              20000 |
     | Global Compatibility    | <Requires API Key>                                 |
     | Mode                    | <Requires API Key>                                 |
     | Service Provider        | gcp                                                |
     | Service Provider Region | us-central1                                        |
     | Package                 | advanced                                           |
     +-------------------------+----------------------------------------------------+
    
  2. Note the Stream Governance cluster ID. In the current example, the cluster ID is lsrc-0m2q5.

  3. Run the following command to get the ID of your ksqlDB cluster.

    confluent ksql cluster list
    

    Your output should resemble:

           ID       |        Name        | Topic Prefix |   Kafka    | …
    ----------------+--------------------+--------------+------------+---
      lksqlc-j8k9l0 | ksqldb-cluster     | pksqlc-x2y3z | lkc-9z8y7x |
    

    Note the ksqlDB cluster ID. In the current example, the cluster ID is lksqlc-j8k9l0.

  4. Create a file named simple_pipeline.json and copy the following JSON into it. Copy your values for env-id, kafka-cluster-id, ksqldb-cluster-id, and sg-cluster-id.

    {
        "spec": {
            "display_name":"test-pipeline",
            "description":"Test pipeline",
            "activation_privilege":true,
            "source_code": {
                "sql": "CREATE STREAM `upstream` (id INTEGER, name STRING) WITH (kafka_topic = 'test-topic', partitions=1, value_format='JSON');"
            },
            "environment": {
                "id": "<env-id>"
            },
            "kafka_cluster": {
                "id":"<kafka-cluster-id>"
            },
            "ksql_cluster":
            {
                "id": "<ksqldb-cluster-id>"
            },
            "stream_governance_cluster":{
                "id": "<sg-cluster-id>"
            }
        }
    }
    
  5. Save the file and run the following command to create the pipeline. Ensure that the TOKEN environment variable is assigned with your value.

    curl -s -H "Authorization: Bearer ${TOKEN}" \
      -X POST -H "Content-Type: application/json" \
      -d @simple_pipeline.json \
      "https://api.confluent.cloud/sd/v1/pipelines" | jq .
    

    Your output should resemble:

     {
         "id": "pipe-1a2b3c",
         "metadata": {
             "self": "https://confluent.cloud/api/sd/v1/clusters/lkc-9z8y7x/pipelines/pipe-1a2b3c",
             "resource_name": "crn://confluent.cloud/organization=b0b21724-4586-4a07-d0bb-9e47daacbf87/environment=env-a1b3c/cloud-cluster=lkc-9z8y7x/pipeline=pipe-1a2b3c",
             "created_at": "2023-01-31T22:17:14.757855523Z",
             "updated_at": "2023-01-31T22:17:15.353688821Z"
         },
         "spec": {
             "display_name": "test-pipeline",
             "description": "Test pipeline",
             "activation_privilege": true,
             "environment": {
                 "api_version": "org/v2",
                 "kind": "Environment",
                 "…": "…"
             }
         }
     }
    
  6. Your pipeline is created and ready to edit in Stream Designer. Note the pipeline ID, because you need it for other requests to manage the pipeline. In the current example, the pipeline ID is pipe-1a2b3c.

  7. For convenience, assign the ID value to an environment variable.

    PIPE_ID=<pipeline-id>
    

Get a pipeline ID

Calls to the REST API require the identifier of the pipeline you want to manage.

  1. Run the following command to list the pipelines in your organization.

    confluent pipeline list
    

    Your output should resemble:

          ID      |      Name     |   Description   | KSQL Cluster  |   State
    --------------+---------------+-----------------+---------------+-------------
      pipe-1a2b3c | test-pipeline |                 | lksqlc-98b76a | draft
    
  2. Note the pipeline ID. In the current example, the ID is pipe-1a2b3c.

  3. For convenience, assign the ID value to an environment variable.

    PIPE_ID=<pipeline-id>
    

View pipeline details

Run the following command to get details about a specific pipeline. Ensure that the TOKEN, PIPE_ID, ENV_ID, and CLUSTER_ID environment variables are assigned with your values.

curl -s -H "Authorization: Bearer ${TOKEN}" \
  -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .

Your output should resemble:

{
    "id": "pipe-1a2b3c",
    "metadata": {
        "self": "https://api.confluent.cloud/sd/v1/clusters/lkc-9z8y7x/pipelines/pipe-1a2b3c",
        "resource_name": "crn://confluent.cloud/organization=0b0b3242-9fb2-483a-acde-d0bb5d841f04/environment=env-a1b3c/cloud-cluster=lkc-9z8y7x/pipeline=pipe-1a2b3c",
        "created_at": "2023-01-05T20:20:01.326947814Z",
        "updated_at": "2023-01-13T20:08:04.024883767Z",
        "activated_at": "2023-01-13T20:07:56.882861364Z"
    },
    "spec": {
        "display_name": "test-pipeline",
        "description": "Test pipeline",
        "activation_privilege": true,
        "…": "…"
    }
}

Note

The json output has no credentials for connectors, and secret values are always redacted.

Save a pipeline to a description file

  1. Run the following command to get details about the pipeline and save them to a local file. Ensure that the TOKEN, PIPE_ID, ENV_ID, and CLUSTER_ID environment variables are assigned with your values.

    curl -s -H "Authorization: Bearer ${TOKEN}" \
      -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \
      jq . > ${PIPE_ID}.json
    

Note

The json output has no credentials for connectors, and secret values are always redacted.

Update a pipeline’s metadata

Update a pipeline’s metadata by modifying a pipeline description file and uploading it with a PATCH request.

  1. Run the following command to get a pipeline description file to modify. In this example, the pipeline metadata is saved to a file named ${PIPE_ID}.json. Ensure that the TOKEN, PIPE_ID, ENV_ID, and CLUSTER_ID environment variables are assigned with your values.

    curl -s -H "Authorization: Bearer ${TOKEN}" \
      -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \
      jq . > ${PIPE_ID}.json
    
  2. Modify the pipeline description file. The following example shows how to change the description string.

 {
     "...": "...",
     "spec": {
         "display_name": "test-pipeline",
         "description": "updated description",
         "activation_privilege": true,
         "...": "..."
     }
 }
  1. Run the following command to update the pipeline with the edited definition file.

    curl -s -H "Authorization: Bearer ${TOKEN}" \
      -X PATCH -H "Content-Type: application/json" \
      -d @${PIPE_ID}.json \
      "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .
    
  2. Run following command to confirm that the pipeline was updated. The jq query selects the .spec.description property from the json response.

    curl -s -H "Authorization: Bearer ${TOKEN}" \
      -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \
      jq .spec.description
    
  3. Run following command to confirm that the pipeline was updated.

    confluent pipeline list
    

    Your output should resemble:

          ID      |      Name     |     Description     | KSQL Cluster  |   State
    --------------+---------------+---------------------+---------------+-------------
      pipe-1a2b3c | test-pipeline | updated description | lksqlc-98b76a | draft
    

Patch a pipeline’s metadata

Update specific values in a pipeline’s metadata by using the PATCH request.

  1. Create a file named ${PIPE_ID}-patch.json and copy the json for the values you want to change into the file. For example, the following json enables patching the pipieline description.

    {
      "spec": {
      "environment": {
        "id": "<env-id>"
      },
      "kafka_cluster": {
         "id": "<kafka-cluster-id>"
      },
      "description": "patched description"
    }
    
  2. Save the file and run the following command to patch the pipeline with the updated description. Ensure that the TOKEN, PIPE_ID, ENV_ID, and CLUSTER_ID environment variables are assigned with your values.

    curl -s -H "Authorization: Bearer ${TOKEN}" \
      -X PATCH -H "Content-Type: application/json" \
      -d @${PIPE_ID}-patch.json \
      "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .
    

Grant activation privileges

Before you can activate a pipeline, you must grant it activation privileges. Granting a pipeline privileges enables it to provision connectors, queries, topics, and other reqources.

Grant privileges by setting activate_privilege to true in the pipeline description file and uploading the modified file with a PATCH request.

  1. Run the following command to get a pipeline description file to modify. In this example, the pipeline metadata is saved to a file named ${PIPE_ID}.json. Ensure that the TOKEN, PIPE_ID, ENV_ID, and CLUSTER_ID environment variables are assigned with your values.

    curl -s -H "Authorization: Bearer ${TOKEN}" \
      -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \
      jq . > ${PIPE_ID}.json
    
  2. Modify the pipeline description file. The following example shows how to change activate_privilege to true.

     {
         "...": "...",
         "spec": {
             "display_name": "test-pipeline",
             "description": "updated description",
             "activation_privilege": true,
             "...": "..."
         }
     }
    
  3. Save the file and run the following command to grant activation privileges to the pipeline.

    curl -s -H "Authorization: Bearer ${TOKEN}" \
      -X PATCH -H "Content-Type: application/json" \
      -d @${PIPE_ID}.json \
      "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .
    

Revoke pipeline privileges

Revoking a pipeline privileges disables its ability to activate and provision connectors, queries, topics, and other reqources.

Revoke privileges by setting activate_privilege to false in the pipeline description file and uploading the modified file with a PATCH request.

Ensure that the pipeline isn’t currently activated. You can deactivate and revoke in the same request.

  1. Run the following command to get a pipeline description file to modify. In this example, the pipeline metadata is saved to a file named ${PIPE_ID}.json. Ensure that the TOKEN, PIPE_ID, ENV_ID, and CLUSTER_ID environment variables are assigned with your values.

    curl -s -H "Authorization: Bearer ${TOKEN}" \
      -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \
      jq . > ${PIPE_ID}.json
    
  2. Modify the pipeline description file. The following example shows how to change activate_privilege to false. Also, it sets activated to false.

     {
         "...": "...",
         "spec": {
             "display_name": "test-pipeline",
             "description": "updated description",
             "activation_privilege": false,
             "activated": false,
             "...": "..."
         }
     }
    
  3. Save the file and run the following command to revoke activation privileges for the pipeline.

    curl -s -H "Authorization: Bearer ${TOKEN}" \
      -X PATCH -H "Content-Type: application/json" \
      -d @${PIPE_ID}.json \
      "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .
    

Activate or deactivate a pipeline

  • Activate a pipeline by setting activated to true in the pipeline description file and uploading the modified file with a PATCH request.
  • Deactivate a pipeline by setting activated to false.

Note

The activate_privilege setting must be set to true for a pipeline to activate.

  1. Run the following command to get a pipeline description file to modify. In this example, the pipeline metadata is saved to a file named ${PIPE_ID}.json. Ensure that the TOKEN, PIPE_ID, ENV_ID, and CLUSTER_ID environment variables are assigned with your values.

    curl -s -H "Authorization: Bearer ${TOKEN}" \
      -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}>" | \
      jq . > ${PIPE_ID}.json
    
  2. Modify the pipeline description file. The following example shows how to change the activated property.

     {
         "...": "...",
         "spec": {
             "display_name": "test-pipeline",
             "description": "updated description",
             "activated": true,
             "...": "..."
         }
     }
    
  3. Save the file and run the following command to activate or deactivate the pipeline.

    curl -s -H "Authorization: Bearer ${TOKEN}" \
      -X PATCH -H "Content-Type: application/json" \
      -d @${PIPE_ID}.json \
      "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .
    

View pipeline source code

The SQL source code for a pipeline is embedded within the pipeline’s description file. Use the jq .spec.source_code query to select the source_code property.

Run the following command to view a pipeline’s source code. Ensure that the TOKEN, PIPE_ID, ENV_ID, and CLUSTER_ID environment variables are assigned with your values.

curl -s -H "Authorization: Bearer ${TOKEN}" \
  -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \
  jq .spec.source_code

Your output should resemble:

{
  "sql": "CREATE SOURCE CONNECTOR \"DatagenSourceConnector_2\" WITH (\n  \"connector.class\"='DatagenSource',\n  \"kafka.api.key\"='${DATAGENSOURCECONNECTOR_2_KAFKA_API_KEY}',\n  \"kafka.api.secret\"='${DATAGENSOURCECONNECTOR_2_KAFKA_API_SECRET}',\n  \"kafka.auth.mode\"='KAFKA_API_KEY',\n  \"kafka.topic\"='jim-test-cli-pageviews-02',\n  \"max.interval\"='1000',\n  \"output.data.format\"='JSON_SR',\n  \"quickstart\"='PAGEVIEWS',\n  \"tasks.max\"='1'\n);\n\nCREATE OR REPLACE STREAM \"pageviews_stream\" \n  WITH (kafka_topic='pageviews-02', partitions=1, value_format='JSON_SR');\n\nCREATE OR REPLACE STREAM \"filtered_pageviews\"\n  WITH (kafka_topic='filtered_pageviews_topic', partitions=1)\n  AS SELECT * FROM \"pageviews_stream\" WHERE userid = 'User_9';\n"
}

Note

The json output has no credentials for connectors, and secret values are always redacted.