Manage Pipelines With the REST API for Stream Designer on Confluent Cloud¶
You can use the Confluent Cloud REST API to create and manage Stream Designer pipelines. Use the following commands to perform operations on your pipelines.
For the REST API reference, see Pipelines REST API.
Tip
You can use the Confluent CLI to manage your pipelines. For more information, see Manage Pipelines With the CLI for Stream Designer on Confluent Cloud.
- Prequisites for using the Pipelines REST API
- Get the environment and Kafka cluster IDs
- Get an authentication token
- Get a pipeline ID
- View pipeline details
- Save a pipeline to a description file
- Update a pipeline’s metadata
- Patch a pipeline’s metadata
- Grant activation privileges
- Revoke pipeline privileges
- Activate or deactivate a pipeline
- View pipeline source code
Prequisites for using the Pipelines REST API¶
To prepare for managing pipelines in Confluent Cloud, follow these steps.
- Request authorization from an
OrganizationAdmin
to create and modify pipelines in your Confluent Cloud environment. - To run the following examples, install the curl and jq commands.
- Run the
confluent update
command to get the latest version of the Confluent CLI.
Get an authentication token¶
You can use a Confluent CLI authentication and authorization token with the curl command. First, log into Confluent Cloud as above, then get the current context:
Log in to Confluent Cloud by using the
confluent login
command.Run the following command to get the current context.
cat ~/.confluent/config.json | jq -r .current_context
Your output should resemble the following:
login-<email>@confluent.io-https://confluent.cloud/
Store the token in a local environment variable. The quotation marks are required.
TOKEN=$(cat ~/.confluent/config.json | jq -r '.context_states."<from-above>".auth_token')
The following example shows how your token assignment should appear.
TOKEN=$(cat ~/.confluent/config.json | jq -r '.context_states."login-joe.cool@example.com-https://confluent.cloud/".auth_token')
Inspect the token.
echo $TOKEN
Your output should resemble the following:
eyJhbB0BciOiJSUzID0BBsNiIsS1ackImprd…
Note
After some time, your token will expire. To continue using the REST API, log in again, and run the previous commands to store the latest token.
Get the environment and Kafka cluster IDs¶
Calls to the REST API require the identifiers for your environment and Kafka cluster.
Run the following command to list the Kafka clusters in your environment.
confluent kafka cluster list
Your output should resemble:
Id | Name | Type | Cloud | Region | Availability | Status ---------------+--------------+-------+----------+-----------+--------------+--------- lkc-9z8y7x | test-cluster | BASIC | gcp | us-west4 | single-zone | UP
Run the following command to list your environments.
confluent env list
Your output should resemble:
ID | Name ---------------+------------------ * env-abc123 | default env-xyz123 | test_env env-wyz345 | dev
Note the Kafka cluster ID and the environment ID. In the current example, the Kafka cluster ID is
lkc-9z8y7x
, and the environment ID isenv-abc123
.For convenience, assign the ID values to environment variables.
CLUSTER_ID=<kafka-cluster-id> && \ ENV_ID=<env-id>
Create a pipeline¶
The following steps show how to create a simple pipeline from a pipeline description file, which is a JSON file that specifies the environment, metadata, and code for your pipeline.
To create a pipeline, you need the identifiers of your ksqlDB and Stream Governance clusters, in addition to the environment and Kafka cluster IDs.
Run the following command to get the ID of your Stream Governance cluster.
confluent schema-registry cluster describe
Your output should resemble:
+-------------------------+----------------------------------------------------+ | Name | Stream Governance Package | | Cluster ID | lsrc-0m2q5 | | Endpoint URL | https://psrc-a1b2c.us-central1.gcp.confluent.cloud | | Used Schemas | 21 | | Available Schemas | 19979 | | Free Schemas Limit | 20000 | | Global Compatibility | <Requires API Key> | | Mode | <Requires API Key> | | Cloud | gcp | | Region | us-central1 | | Package | advanced | +-------------------------+----------------------------------------------------+
Note the Stream Governance cluster ID. In the current example, the cluster ID is
lsrc-0m2q5
.Run the following command to get the ID of your ksqlDB cluster.
confluent ksql cluster list
Your output should resemble:
ID | Name | Topic Prefix | Kafka | … ----------------+--------------------+--------------+------------+--- lksqlc-j8k9l0 | ksqldb-cluster | pksqlc-x2y3z | lkc-9z8y7x |
Note the ksqlDB cluster ID. In the current example, the cluster ID is
lksqlc-j8k9l0
.Create a file named
simple_pipeline.json
and copy the following JSON into it. Copy your values forenv-id
,kafka-cluster-id
,ksqldb-cluster-id
, andsg-cluster-id
.{ "spec": { "display_name":"test-pipeline", "description":"Test pipeline", "activation_privilege":true, "source_code": { "sql": "CREATE STREAM `upstream` (id INTEGER, name STRING) WITH (kafka_topic = 'test-topic', partitions=1, value_format='JSON');" }, "environment": { "id": "<env-id>" }, "kafka_cluster": { "id":"<kafka-cluster-id>" }, "ksql_cluster": { "id": "<ksqldb-cluster-id>" }, "stream_governance_cluster":{ "id": "<sg-cluster-id>" } } }
Save the file and run the following command to create the pipeline. Ensure that the
TOKEN
environment variable is assigned with your value.curl -s -H "Authorization: Bearer ${TOKEN}" \ -X POST -H "Content-Type: application/json" \ -d @simple_pipeline.json \ "https://api.confluent.cloud/sd/v1/pipelines" | jq .
Your output should resemble:
{ "id": "pipe-1a2b3c", "metadata": { "self": "https://confluent.cloud/api/sd/v1/clusters/lkc-9z8y7x/pipelines/pipe-1a2b3c", "resource_name": "crn://confluent.cloud/organization=b0b21724-4586-4a07-d0bb-9e47daacbf87/environment=env-a1b3c/cloud-cluster=lkc-9z8y7x/pipeline=pipe-1a2b3c", "created_at": "2023-01-31T22:17:14.757855523Z", "updated_at": "2023-01-31T22:17:15.353688821Z" }, "spec": { "display_name": "test-pipeline", "description": "Test pipeline", "activation_privilege": true, "environment": { "api_version": "org/v2", "kind": "Environment", "…": "…" } } }
Your pipeline is created and ready to edit in Stream Designer. Note the pipeline ID, because you need it for other requests to manage the pipeline. In the current example, the pipeline ID is
pipe-1a2b3c
.For convenience, assign the ID value to an environment variable.
PIPE_ID=<pipeline-id>
Get a pipeline ID¶
Calls to the REST API require the identifier of the pipeline you want to manage.
Run the following command to list the pipelines in your organization.
confluent pipeline list
Your output should resemble:
ID | Name | Description | KSQL Cluster | State --------------+---------------+-----------------+---------------+------------- pipe-1a2b3c | test-pipeline | | lksqlc-98b76a | draft
Note the pipeline ID. In the current example, the ID is
pipe-1a2b3c
.For convenience, assign the ID value to an environment variable.
PIPE_ID=<pipeline-id>
View pipeline details¶
Run the following command to get details about a specific pipeline. Ensure that
the TOKEN
, PIPE_ID
, ENV_ID
, and CLUSTER_ID
environment variables
are assigned with your values.
curl -s -H "Authorization: Bearer ${TOKEN}" \
-X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .
Your output should resemble:
{
"id": "pipe-1a2b3c",
"metadata": {
"self": "https://api.confluent.cloud/sd/v1/clusters/lkc-9z8y7x/pipelines/pipe-1a2b3c",
"resource_name": "crn://confluent.cloud/organization=0b0b3242-9fb2-483a-acde-d0bb5d841f04/environment=env-a1b3c/cloud-cluster=lkc-9z8y7x/pipeline=pipe-1a2b3c",
"created_at": "2023-01-05T20:20:01.326947814Z",
"updated_at": "2023-01-13T20:08:04.024883767Z",
"activated_at": "2023-01-13T20:07:56.882861364Z"
},
"spec": {
"display_name": "test-pipeline",
"description": "Test pipeline",
"activation_privilege": true,
"…": "…"
}
}
Note
The json output has no credentials for connectors, and secret values are always redacted.
Save a pipeline to a description file¶
Run the following command to get details about the pipeline and save them to a local file. Ensure that the
TOKEN
,PIPE_ID
,ENV_ID
, andCLUSTER_ID
environment variables are assigned with your values.curl -s -H "Authorization: Bearer ${TOKEN}" \ -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \ jq . > ${PIPE_ID}.json
Note
The json output has no credentials for connectors, and secret values are always redacted.
Update a pipeline’s metadata¶
Update a pipeline’s metadata by modifying a pipeline description file and uploading it with a PATCH request.
Run the following command to get a pipeline description file to modify. In this example, the pipeline metadata is saved to a file named
${PIPE_ID}.json
. Ensure that theTOKEN
,PIPE_ID
,ENV_ID
, andCLUSTER_ID
environment variables are assigned with your values.curl -s -H "Authorization: Bearer ${TOKEN}" \ -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \ jq . > ${PIPE_ID}.json
Modify the pipeline description file. The following example shows how to change the description string.
{
"...": "...",
"spec": {
"display_name": "test-pipeline",
"description": "updated description",
"activation_privilege": true,
"...": "..."
}
}
Run the following command to update the pipeline with the edited definition file.
curl -s -H "Authorization: Bearer ${TOKEN}" \ -X PATCH -H "Content-Type: application/json" \ -d @${PIPE_ID}.json \ "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .
Run following command to confirm that the pipeline was updated. The jq query selects the
.spec.description
property from the json response.curl -s -H "Authorization: Bearer ${TOKEN}" \ -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \ jq .spec.description
Run following command to confirm that the pipeline was updated.
confluent pipeline list
Your output should resemble:
ID | Name | Description | KSQL Cluster | State --------------+---------------+---------------------+---------------+------------- pipe-1a2b3c | test-pipeline | updated description | lksqlc-98b76a | draft
Patch a pipeline’s metadata¶
Update specific values in a pipeline’s metadata by using the PATCH request.
Create a file named
${PIPE_ID}-patch.json
and copy the json for the values you want to change into the file. For example, the following json enables patching the pipieline description.{ "spec": { "environment": { "id": "<env-id>" }, "kafka_cluster": { "id": "<kafka-cluster-id>" }, "description": "patched description" }
Save the file and run the following command to patch the pipeline with the updated description. Ensure that the
TOKEN
,PIPE_ID
,ENV_ID
, andCLUSTER_ID
environment variables are assigned with your values.curl -s -H "Authorization: Bearer ${TOKEN}" \ -X PATCH -H "Content-Type: application/json" \ -d @${PIPE_ID}-patch.json \ "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .
Grant activation privileges¶
Before you can activate a pipeline, you must grant it activation privileges. Granting a pipeline privileges enables it to provision connectors, queries, topics, and other reqources.
Grant privileges by setting activate_privilege
to true
in the pipeline
description file and uploading the modified file with a PATCH request.
Run the following command to get a pipeline description file to modify. In this example, the pipeline metadata is saved to a file named
${PIPE_ID}.json
. Ensure that theTOKEN
,PIPE_ID
,ENV_ID
, andCLUSTER_ID
environment variables are assigned with your values.curl -s -H "Authorization: Bearer ${TOKEN}" \ -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \ jq . > ${PIPE_ID}.json
Modify the pipeline description file. The following example shows how to change
activate_privilege
totrue
.{ "...": "...", "spec": { "display_name": "test-pipeline", "description": "updated description", "activation_privilege": true, "...": "..." } }
Save the file and run the following command to grant activation privileges to the pipeline.
curl -s -H "Authorization: Bearer ${TOKEN}" \ -X PATCH -H "Content-Type: application/json" \ -d @${PIPE_ID}.json \ "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .
Revoke pipeline privileges¶
Revoking a pipeline privileges disables its ability to activate and provision connectors, queries, topics, and other reqources.
Revoke privileges by setting activate_privilege
to false
in the pipeline
description file and uploading the modified file with a PATCH request.
Ensure that the pipeline isn’t currently activated. You can deactivate and revoke in the same request.
Run the following command to get a pipeline description file to modify. In this example, the pipeline metadata is saved to a file named
${PIPE_ID}.json
. Ensure that theTOKEN
,PIPE_ID
,ENV_ID
, andCLUSTER_ID
environment variables are assigned with your values.curl -s -H "Authorization: Bearer ${TOKEN}" \ -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \ jq . > ${PIPE_ID}.json
Modify the pipeline description file. The following example shows how to change
activate_privilege
tofalse
. Also, it setsactivated
tofalse
.{ "...": "...", "spec": { "display_name": "test-pipeline", "description": "updated description", "activation_privilege": false, "activated": false, "...": "..." } }
Save the file and run the following command to revoke activation privileges for the pipeline.
curl -s -H "Authorization: Bearer ${TOKEN}" \ -X PATCH -H "Content-Type: application/json" \ -d @${PIPE_ID}.json \ "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .
Activate or deactivate a pipeline¶
- Activate a pipeline by setting
activated
totrue
in the pipeline description file and uploading the modified file with a PATCH request. - Deactivate a pipeline by setting
activated
tofalse
.
Note
The activate_privilege
setting must be set to true
for a pipeline to
activate.
Run the following command to get a pipeline description file to modify. In this example, the pipeline metadata is saved to a file named
${PIPE_ID}.json
. Ensure that theTOKEN
,PIPE_ID
,ENV_ID
, andCLUSTER_ID
environment variables are assigned with your values.curl -s -H "Authorization: Bearer ${TOKEN}" \ -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}>" | \ jq . > ${PIPE_ID}.json
Modify the pipeline description file. The following example shows how to change the
activated
property.{ "...": "...", "spec": { "display_name": "test-pipeline", "description": "updated description", "activated": true, "...": "..." } }
Save the file and run the following command to activate or deactivate the pipeline.
curl -s -H "Authorization: Bearer ${TOKEN}" \ -X PATCH -H "Content-Type: application/json" \ -d @${PIPE_ID}.json \ "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .
View pipeline source code¶
The SQL source code for a pipeline is embedded within the pipeline’s description
file. Use the jq .spec.source_code
query to select the source_code
property.
Run the following command to view a pipeline’s source code. Ensure that the
TOKEN
, PIPE_ID
, ENV_ID
, and CLUSTER_ID
environment variables
are assigned with your values.
curl -s -H "Authorization: Bearer ${TOKEN}" \
-X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \
jq .spec.source_code
Your output should resemble:
{
"sql": "CREATE SOURCE CONNECTOR \"DatagenSourceConnector_2\" WITH (\n \"connector.class\"='DatagenSource',\n \"kafka.api.key\"='${DATAGENSOURCECONNECTOR_2_KAFKA_API_KEY}',\n \"kafka.api.secret\"='${DATAGENSOURCECONNECTOR_2_KAFKA_API_SECRET}',\n \"kafka.auth.mode\"='KAFKA_API_KEY',\n \"kafka.topic\"='jim-test-cli-pageviews-02',\n \"max.interval\"='1000',\n \"output.data.format\"='JSON_SR',\n \"quickstart\"='PAGEVIEWS',\n \"tasks.max\"='1'\n);\n\nCREATE OR REPLACE STREAM \"pageviews_stream\" \n WITH (kafka_topic='pageviews-02', partitions=1, value_format='JSON_SR');\n\nCREATE OR REPLACE STREAM \"filtered_pageviews\"\n WITH (kafka_topic='filtered_pageviews_topic', partitions=1)\n AS SELECT * FROM \"pageviews_stream\" WHERE userid = 'User_9';\n"
}
Note
The json output has no credentials for connectors, and secret values are always redacted.