Stream Catalog REST API Usage on Confluent Cloud

The Stream Catalog API allows you to create, retrieve, update, and delete catalog entities through a REST API, and add business metadata to entities. To learn more, see the Confluent Cloud CATALOG API (V1).

Catalog API usage examples

This document provides examples of calls to the Confluent Cloud CATALOG API (V1) using curl commands.

Prerequisites and API examples are provided in these sections:

Catalog API usage limitations and best practices

Rate Limits on searches of the Stream Governance cluster are capped at 25 Write requests per second and 75 Read requests, in addition to other internal limits. The catalog search API to find by entity type is not designed for continuous requests at rate limits. Therefore, it is not advisable to continuously make requests to the catalog search endpoint to get topics and schemas, as there are recommended alternative methods using different APIs.

Recommended methods are:

To learn about optimal methods for fetching schemas to support your workflows, contact Confluent Global Technical Support (GTS). The Support portal is available at https://support.confluent.io.

Setup and suggestions

You’ll need to know the following to make API calls to the CATALOG API per the usage examples provided here:

  • API endpoint URL for the Confluent Cloud Schema Registry cluster in the environment you want to use. In the examples below, this is typically referred to as the <SCHEMA-REGISTRY-URL> or $SCHEMA_REGISTRY_URL. To find this on the Cloud Console navigate to an environment and locate Stream Governance API > Endpoint on the right panel. Alternatively, log on to Confluent Cloud with the Confluent CLI, make sure you have the appropriate environment selected (confluent env list, confluent env use <environment-ID>), and enter confluent schema-registry cluster describe. (A handy list of CLI commands is here.)
  • If you are using API keys, you need an API key and secret for Schema Registry in the environment you want to use. (If needed, see Create an API Key for Confluent Cloud Schema Registry.) Schema subjects live at the level of an environment, in per-environment Schema Registry clusters, so just choose the environment that contains the Kafka clusters you want to use.
  • If you are using OAuth, you must provide your access token for authorization in the proper format, as described in Access token format. An OAuth example is provided below.
  • To get properly formatted output for search calls that include question mark (?) or ampersand (&), you must enclose the URL and search parameters in either single or double quotes so that the shell does not interpret them This syntax is also shown in the examples.
  • If an entity name has a space in it, use %20 to indicate the space in API calls. See Search for connectors by name for an example.
  • (Optional) You may want to store your API key and secret in local shell environment variables to make testing easier. For example: export API_KEY=xyz. You can check the contents of a variable with echo $<VAR> (for example, echo $APIKEY), then use it as such in subsequent commands: curl --silent -u $APIKEY:$APISECRET --request GET --url $SCHEMA_REGISTRY_URL/catalog/v1/types/tagdefs. You can also store the Schema Registry URL in a shell variable like $SCHEMA_REGISTRY_URL, but this won’t work for calls where the value for --url is within single or double quotes. For examples that don’t require quotes, the shell variable is shown. If you get an error using a shell variable, a first troubleshooting step is to use the actual value in the command.
  • (Optional) For your API testing, you may want to use the --silent flag with the curl commands and pipe the entire command through jq . to get nicely formatted output. Most of the examples below show the calls with these options included, and the expected output. If you don’t use --silent and jq, the content of the output will be the same, it just won’t be formatted as human-friendly readable.
  • (Optional) A Confluent Cloud login is handy for comparing API results with the Confluent Cloud Console (https://confluent.cloud/).

OAuth for Confluent Cloud Stream Catalog REST API

Stream Catalog REST API now supports OAuth, an open-standard authorization protocol for secure access. To learn more about OAuth, see Use OAuth-OIDC on Confluent Cloud and Configure Schema Registry clients for OAuth.

Here is an example of using OAuth to call to get a list of Tags from Stream Catalog.

curl --request GET \
--url $SCHEMA_REGISTRY_URL/catalog/v1/types/tagdefs \
--header 'Confluent-Identity-Pool-Id: <POOL-ID>' \
--header 'target-sr-cluster: <CLUSTER-ID' \
--header 'Authorization: Bearer $TOKEN'

The rest of the examples on this page use an API key and secret. If you want to use OAuth instead, remove the -u $APIKEY:$APISECRET and follow the format shown above (specify identity pool ID, Schema Registry cluster, and OAuth bearer token).

Entity types

Tags and Business Metadata can be applied to the same set of entities. This set of entities currently includes a global type cf_entity that can be applied to any entity, and customized definitions of schema registry elements:

  • cn_connector
  • cf_entity
  • cf_environment
  • flink_compute_pool
  • kafka_cluster_link
  • kafka_logical_cluster
  • kafka_topic
  • pl_pipeline
  • sr_array
  • sr_combined
  • sr_entity
  • sr_enum
  • sr_field
  • sr_fixed
  • sr_map
  • sr_primitive
  • sr_record
  • sr_schema
  • sr_subject_version
  • sr_type

All the types prefixed with sr_ are defined under the super class of sr_entity, which can be used to indicate “any type of SR entity”. Other types are defined under the super class of cf_entity.

For lists of entities in the API references, see Confluent Stream Catalog GraphQL API Reference and the CATALOG API (V1).

How entities are identified

Entities are identified by their qualified name. The format of the qualified name differs somewhat per entity. For example:

Qualified name definitions

Qualified names are used to identify entities. Different entities have different qualified names.

A context is a Schema Registry concept. In these examples, context is always . (a dot). (To learn more, see What are schema contexts and when should you use them? in the FAQ) and Schema contexts in the Schema Linking documentation.

When creating or referring to an entity, you can omit the tenant and context because these are automatically prepended.

If part of a qualified name has a colon or a single quote, then the single quotes should be doubled, and the resulting text surrounded by single quotes.

A tag definition is not an entity but a definition of a specific type of entity. Tag definitions are auto-prefixed with the tenant name and a double underscore, so when you create or refer to a tag, you can just use the tag name; such as, Public or Sensitive.

Examples of qualified names

Here are examples of fully qualified names:

  • For a subject version: <tenant>:<context>:<subject>:<version>
  • For a schema: <tenant>:<context>:<id>
  • For a record: <tenant>:<context>:<id>:<fully-qualified-record-name>
  • For a field: <tenant>:<context>:<id>:<fully-qualified-record-name>:<field-name>
Entity type typeName Qualified name minus tenant & context Example
schema sr_schema {id} 000001
subject version sr_subject_version {subject}:{version} my_subject:v1
record sr_record {fully-qualified-record-name} my_record
field sr_field {fully-qualified-record-name}:{field-name} my_record:field_1
customized tag {tag-name}    
topic kafka_topic {lkc-id}:{topic-name} lkc-123:topic-0
Kafka cluster kafka_logical_cluster {logical-kafka-cluster-id} lkc-123
pipeline pl_pipeline {pipeline-id} pipeline-123
connector cn_connector {logical-connector-cluster-id} lcc-123
environment cf_environment {environment-id} env-123
cluster link kafka_cluster_link {lkc-id}:{cluster-link-id} lkc-123:link-123
Flink compute pool flink_compute_pool {flink-compute-pool-id} lfcp-123

Entity APIs

Here is a generic example of how an API call on an entity. The basic call to GET an entity is as follows:

GET /catalog/v1/entity/type/{typeName}/name/{qualifiedName}

An example response is shown below.

{
  "referredEntities": {},
  "entity": {
    "typeName": "sr_subject_version",
    "attributes": {
      "owner": null,
      "replicatedTo": null,
      "userDescription": null,
      "replicatedFrom": null,
      "createTime": 1600207301780,
      "qualifiedName": "tenant3:test-value2:1",
      "displayName": null,
      "name": "test-value2",
      "description": null,
      "id": 100001,
      "version": 1,
      "tenant": "tenant3"
    },
    "guid": "cd27f1d9-6483-4ff4-94e3-6f8c4492176b",
    "isIncomplete": false,
    "status": "ACTIVE",
    "createdBy": "UNKNOWN",
    "updatedBy": "UNKNOWN",
    "createTime": 1600207302005,
    "updateTime": 1600207302005,
    "version": 0,
    "relationshipAttributes": {
      "schema": {
       "guid": "1897eba4-6647-443a-9e87-1d0ed6975cb6",
        "typeName": "sr_schema",
        "entityStatus": "ACTIVE",
        "displayText": "t1record",
        "relationshipType": "sr_subject_version_schema",
        "relationshipGuid": "a8aa24b4-9d35-46ef-9b07-95f3f2ee0c72",
        "relationshipStatus": "ACTIVE",
        "relationshipAttributes": {
          "typeName": "sr_subject_version_schema"
        }
      },
      "inputToProcesses": [],
      "schemas": [],
      "meanings": [],
      "outputFromProcesses": []
    },
    "labels": []
  }
}

In the following sections, fully fleshed-out examples are provided for different entity types.

Searching

Following are a few shortcuts into the examples that show how to search, list, and find things quickly, since these are tasks you’ll likely perform frequently to get input values for other API calls.

Tags API examples

Stream Catalog features enables entity tagging. Tags are searchable like any other entity. Tags concepts are fully covered in the Confluent Cloud Console user guide under Tag entities, data, and schemas, so if you have not read that overview you might want to have a look at it first.

This section provides usage examples for working with tags through the Confluent Cloud CATALOG API (V1).

If you walked through the stocks examples for the Stream lineage tutorial, you will have most of the example entities shown below in your environment already. You can further enhance discoverability of those resources by tagging schemas, records, and fields, and then feed those tags into some of the searches shown below.

Tip

Be sure to review the Setup and suggestions notes for these API usage examples before you begin.

Create a tag

Tags can be created to apply to only specific entities or to apply to any entity. Tags created from the Confluent Cloud Console are always specified as cf_entity, and therefore can be applied to all types of entities.

Create a generic tag applicable to a topic or any entity

From the API, to create a tag that applies to any entity (topics, schemas, fields, records, connectors, and so forth), specify cf_entity as the entity type in the data block:

curl --silent -u $APIKEY:$APISECRET --header 'Content-Type: application/json' \
--data '[ { "entityTypes" : [ "cf_entity" ],"name" :  "<TAG_NAME>", "description" : "<tag description>"} ]' \
--url $SCHEMA_REGISTRY_URL/catalog/v1/types/tagdefs | jq .

Tip

Always make sure that the data block is on one line.

For example, this call creates a tag called MyFaves:

curl --silent -u $APIKEY:$APISECRET --header 'Content-Type: application/json' \
--data '[ { "entityTypes" : [ "cf_entity" ],"name" :  "MyFaves", "description" : "Test adding global applicable tag"} ]' \
--url $SCHEMA_REGISTRY_URL/catalog/v1/types/tagdefs | jq .

Your output will resemble:

"category": "CLASSIFICATION",
"createdBy": "root",
"updatedBy": "root",
"createTime": 1677798368458,
"updateTime": 1677798368458,
"version": 1,
"name": "MyFaves",
"description": "Test adding global applicable tag",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"entityTypes": [
  "cf_entity"

The tag used in the previous example, myglobaltag, was created in this same way:

curl --silent -u $APIKEY:$APISECRET --header 'Content-Type: application/json' \
--data '[ { "entityTypes" : [ "cf_entity" ],"name" :  "myglobaltag", "description" : "Test adding global applicable tag"} ]' \
--url $SCHEMA_REGISTRY_URL/catalog/v1/types/tagdefs | jq .
Create a tag applicable to only specific entities

If you want to create a tag that’s specific for only certain entity types, you must specify these in the data block.

For example, to create one of the commonly-used, pre-fab tags (PII, Sensitive, Private, Public) that can be applied only to schemas, records, and fields, provide that tag name in the API call and specify the entities in the data block. The following command creates a PII tag:

curl --silent -u $APIKEY:$APISECRET \
--header 'Content-Type: application/json' \
--data '[ { "entityTypes" : [ "sr_schema", "sr_record", "sr_field"],"name" : "PII","description" : "Personally Identifiable Information"} ]' \
--url $SCHEMA_REGISTRY_URL/catalog/v1/types/tagdefs

Tip

If you were to include cf_entity to the data block in the above example, this tag could also be applied to topics. As it is, this tag can only be applied to schemas, records, and fields; if a user attempts to apply it to a topic, an error is generated.

To create a custom tag called PCI with which to tag stock symbols on your buy list, using curl --silent and piping it through jq:

curl --silent -u $APIKEY:$APISECRET \
--header 'Content-Type: application/json' \
--data '[ { "entityTypes" : [ "sr_schema", "sr_record", "sr_field", "sr_schema" ],"name" : "PCI","description" : "Payment Card Industry Data Security Standard"} ]' \
--url $SCHEMA_REGISTRY_URL/catalog/v1/types/tagdefs | jq .

Your output should resemble:

"category": "CLASSIFICATION",
  "createdBy": "root",
  "updatedBy": "root",
  "createTime": 1631914851910,
  "updateTime": 1631914851910,
  "version": 1,
  "name": "PCI",
  "description": "Payment Card Industry Data Security Standard",
  "typeVersion": "1.0",
  "attributeDefs": [],
  "superTypes": [],
  "entityTypes": [
    "sr_field",
    "sr_schema",
    "sr_record"
    "sr_schema"
  ]

To create a custom tag called stocks_buy with which to tag stock symbols on your buy list, using curl --silent and piping it through jq:

curl --silent -u $APIKEY:$APISECRET \
--header 'Content-Type: application/json' \
--data '[ { "entityTypes" : [ "sr_schema", "sr_record", "sr_field", "sr_schema" ],"name" : "stocks_buy","description" : "stock symbols on buy list"} ]' \
--url $SCHEMA_REGISTRY_URL/catalog/v1/types/tagdefs | jq .

Your output should resemble:

"category": "CLASSIFICATION",
  "createdBy": "root",
  "updatedBy": "root",
  "createTime": 1631914483922,
  "updateTime": 1631914483922,
  "version": 1,
  "name": "stocks_buy",
  "description": "stock symbols on buy list",
  "typeVersion": "1.0",
  "attributeDefs": [],
  "superTypes": [],
  "entityTypes": [
    "sr_field",
    "sr_schema",
    "sr_record"
    "sr_schema"
  ]

Create another custom tag called my_stocks:

curl --silent -u $APIKEY:$APISECRET \
--header 'Content-Type: application/json' \
--data '[ { "entityTypes" : [ "sr_schema", "sr_record", "sr_field", "sr_schema" ],"name" : "my_stocks","description" : "stocks I am tracking"} ]' \
--url $SCHEMA_REGISTRY_URL/catalog/v1/types/tagdefs | jq .

Log on to https://confluent.cloud and check the Cloud Console to verify that the new tags show up there. Navigate to the environment in which you are working, and click Tags on the right side panel.

All tags are shown on this view.

../_images/dg-catalog-api-view-created-tags-detail.png

Update a tag by adding attributes (key, value pairs)

The following call updates the PCI tag with several new attributes.

curl --silent -u $APIKEY:$APISECRET --request PUT \
  --url $SCHEMA_REGISTRY_URL/catalog/v1/types/tagdefs \
  --header 'Content-Type: application/json' \
  --data '[{ "entityTypes" : [ "sr_record", "sr_field" ],
           "name" : "PCI", "description" : "Payment Card Industry Data Security Standard" ,
           "attributeDefs" : [ { "name" : "key", "cardinality" : "SINGLE", "typeName" : "string" },
           { "name" : "region", "isOptional" : "true", "cardinality" : "SINGLE", "typeName" : "string" },
           { "name" : "keytype", "isOptional" : "true", "cardinality" : "SINGLE", "typeName" : "string" } ] }]' | jq .

Your output should resemble:

"category": "CLASSIFICATION",
 "name": "PCI",
 "description": "Payment Card Industry Data Security Standard",
 "attributeDefs": [
   {
     "name": "key",
     "typeName": "string",
     "isOptional": false,
     "cardinality": "SINGLE",
     "valuesMinCount": -1,
     "valuesMaxCount": -1,
     "isUnique": false,
     "isIndexable": false,
     "includeInNotification": false,
     "searchWeight": -1
   },
   {
     "name": "region",
     "typeName": "string",
     "isOptional": true,
     "cardinality": "SINGLE",
     "valuesMinCount": -1,
     "valuesMaxCount": -1,
     "isUnique": false,
     "isIndexable": false,
     "includeInNotification": false,
     "searchWeight": -1
   },
   {
     "name": "keytype",
     "typeName": "string",
     "isOptional": true,
     "cardinality": "SINGLE",
     "valuesMinCount": -1,
     "valuesMaxCount": -1,
     "isUnique": false,
     "isIndexable": false,
     "includeInNotification": false,
     "searchWeight": -1
   }
 ],
 "superTypes": [],
 "entityTypes": [
   "sr_field",
   "sr_record"
 ]

List all tags (with definitions)

curl --silent -u $APIKEY:$APISECRET \
  --request GET \
  --url $SCHEMA_REGISTRY_URL/catalog/v1/types/tagdefs | jq .

Your output will resemble:

"category": "CLASSIFICATION",
"createdBy": "root",
"updatedBy": "root",
"createTime": 1677721660028,
"updateTime": 1677721660028,
"version": 1,
"name": "myglobaltag",
"description": "Test adding global applicable tag",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"entityTypes": [
  "cf_entity"
  ]
},

{
  "category": "CLASSIFICATION",
  "createdBy": "root",
  "updatedBy": "root",
  "createTime": 1677798368458,
  "updateTime": 1677798368458,
  "version": 1,
  "name": "MyFaves",
  "description": "Test adding global applicable tag",
  "typeVersion": "1.0",
  "attributeDefs": [],
  "superTypes": [],
  "entityTypes": [
    "cf_entity"
  ]

  },

{
 "category": "CLASSIFICATION",
  "guid": "f94803c3-6aba-4b41-a003-2f8fc1957c29",
  "createdBy": "root",
  "updatedBy": "root",
  "createTime": 1631593629096,
  "updateTime": 1631593629096,
  "version": 1,
  "name": "PII",
  "description": "Personally identifiable information",
  "typeVersion": "1.0",
  "attributeDefs": [],
  "superTypes": [],
  "entityTypes": [
    "cf_entity"
      ]
   },

{
  "category": "CLASSIFICATION",
  "createdBy": "root",
  "updatedBy": "root",
  "createTime": 1631914851910,
  "updateTime": 1631914851910,
  "version": 1,
  "name": "PCI",
  "description": "Payment Card Industry Data Security Standard",
  "typeVersion": "1.0",
  "attributeDefs": [],
  "superTypes": [],
  "entityTypes": [
    "sr_field",
    "sr_schema",
    "sr_record",
    "sr_schema"
  ]
   },

{
  "category": "CLASSIFICATION",
  "createdBy": "root",
  "updatedBy": "root",
  "createTime": 1631914483920,
  "updateTime": 1631914483920,
  "version": 1,
  "name": "stocks_buy",
  "description": "stock symbols on buy list",
  "typeVersion": "1.0",
  "attributeDefs": [],
  "superTypes": [],
  "entityTypes": [
    "sr_field",
    "sr_schema",
    "sr_record",
    "sr_schema"
  ]
},

  ...

Get tag definition

Get the definition for the PII (personally identifiable information) tag.

curl --silent -u $APIKEY:$APISECRET \
--request GET \
--url $SCHEMA_REGISTRY_URL/catalog/v1/types/tagdefs/PII | jq .

Your output should resemble:

 "category": "CLASSIFICATION",
"createdBy": "root",
"updatedBy": "root",
"createTime": 1631593629105,
"updateTime": 1631593629105,
"version": 1,
"name": "PII",
"description": "Personally identifiable information",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"entityTypes": [
  "cf_entity"
]

Search fields by name

Search for fields named credit_card:

curl --silent -u $APIKEY:$APISECRET \
--request GET \
--url '<SCHEMA-REGISTRY-URL>/catalog/v1/search/basic?type=sr_field&query=credit_card' | jq .

Your output should resemble:

"searchParameters": {
  "includeDeleted": false,
  "limit": 0,
  "offset": 0
},
"types": [
  "sr_field"
],
"entities": [
  {
    "typeName": "sr_field",
    "attributes": {
      "createTime": 1632516635687,
      "qualifiedName": "lsrc-g2p81:.:100007:com.mycorp.mynamespace.sampleRecord.credit_card",
      "name": "credit_card",
      "context": ".",
      "id": 100007,
      "nameLower": "credit_card",
      "tenant": "lsrc-g2p81"
    },
    "guid": "541fae87-7fe5-4cab-b509-0f76cbf47515",
    "status": "ACTIVE",
    "displayText": "credit_card",
    "classificationNames": [
      "credit_card",
      "PII"
    ],
    "meaningNames": [],
    "meanings": [],
    "isIncomplete": false,
    "labels": [

Search fields by tag

Search for fields tagged as PII.

curl --silent -u $APIKEY:$APISECRET \
--request GET \
--url '<SCHEMA-REGISTRY-URL>/catalog/v1/search/basic?type=sr_field&tag=PII' | jq .

Your output should resemble:

"searchParameters": {
   "includeDeleted": false,
   "limit": 0,
   "offset": 0
 },
 "types": [
   "sr_field"
 ],
 "entities": [
   {
     "typeName": "sr_field",
     "attributes": {
       "createTime": 1614710010055,
       "qualifiedName": "lsrc-g2p81:.:100001:ksql.StockTrade.userid",
       "name": "userid",
       "context": ".",
       "id": 100001,
       "nameLower": "userid",
       "tenant": "lsrc-g2p81"
     },
     "guid": "99a87d47-f55c-4cff-ad36-cf870ebab382",
     "status": "ACTIVE",
     "displayText": "userid",
     "classificationNames": [
       "PII"
     ],
     "meaningNames": [],
     "meanings": [],
     "isIncomplete": false,
     "labels": []
   },
   {
     "typeName": "sr_field",
     "attributes": {
       "createTime": 1614710010055,
       "qualifiedName": "lsrc-g2p81:.:100001:ksql.StockTrade.account",
       "name": "account",
       "context": ".",
       "id": 100001,
       "nameLower": "account",
       "tenant": "lsrc-g2p81"
     },
     "guid": "e8b96055-30ef-4203-adbc-30e28fa419c9",
     "status": "ACTIVE",
     "displayText": "account",
     "classificationNames": [
       "PII"
     ],
     "meaningNames": [],
     "meanings": [],
     "isIncomplete": false,
     "labels": []
   },

   ...

Search schema record by name

Search for a schema record in namespace ksql with record name StockTrade.

curl --silent -u $APIKEY:$APISECRET \
--request GET \
--url '<SCHEMA-REGISTRY-URL>/catalog/v1/search/basic?type=sr_record&query=ksql.StockTrade' | jq .

Your output should resemble:

 "searchParameters": {
  "includeDeleted": false,
  "limit": 0,
  "offset": 0
},
"types": [
  "sr_record"
],
"entities": [
  {
    "typeName": "sr_record",
    "attributes": {
      "createTime": 1614710010055,
      "qualifiedName": "lsrc-g2p81:.:100001:ksql.StockTrade",
      "name": "StockTrade",
      "context": ".",
      "id": 100001,
      "nameLower": "stocktrade",
      "tenant": "lsrc-g2p81"
    },
    "guid": "d757148e-28f6-465b-b7de-088732848ab5",
    "status": "ACTIVE",
    "displayText": "StockTrade",
    "classificationNames": [
      "my_stocks",
      "Sensitive"
    ],
    "meaningNames": [],
    "meanings": [],
    "isIncomplete": false,
    "labels": []

Search schema by tag

Search for schema records tagged with my_stocks.

curl --silent -u $APIKEY:$APISECRET \
--request GET \
--url '<SCHEMA-REGISTRY-URL>/catalog/v1/search/basic?type=sr_record&tag=my_stocks' | jq .

Your output should resemble:

"searchParameters": {
  "includeDeleted": false,
  "limit": 0,
  "offset": 0
},
"types": [
  "sr_record"
],
"entities": [
  {
    "typeName": "sr_record",
    "attributes": {
      "createTime": 1614902469162,
      "qualifiedName": "lsrc-g2p81:.:100003:io.confluent.ksql.avro_schemas.KsqlDataSourceSchema",
      "name": "KsqlDataSourceSchema",
      "context": ".",
      "id": 100003,
      "nameLower": "ksqldatasourceschema",
      "tenant": "lsrc-g2p81"
    },
    "guid": "dee2acf2-08e5-43af-a2de-927a06f17c33",
    "status": "ACTIVE",
    "displayText": "KsqlDataSourceSchema",
    "classificationNames": [
      "my_stocks"
    ],
    "meaningNames": [],
    "meanings": [],
    "isIncomplete": false,
    "labels": []
  },
  {
    "typeName": "sr_record",
    "attributes": {
      "createTime": 1614710010055,
      "qualifiedName": "lsrc-g2p81:.:100001:ksql.StockTrade",
      "name": "StockTrade",
      "context": ".",
      "id": 100001,
      "nameLower": "stocktrade",
      "tenant": "lsrc-g2p81"
    },
    "guid": "d757148e-28f6-465b-b7de-088732848ab5",
    "status": "ACTIVE",
    "displayText": "StockTrade",
    "classificationNames": [
      "my_stocks",
      "Sensitive"
    ],
    "meaningNames": [],
    "meanings": [],
    "isIncomplete": false,
    "labels": []

Tag a field in Avro

Suppose you want to update the stocks_buy-value schema by tagging a field in one of its records. Specifically, the stocks_buy-value schema contains the record KsqlDataSourceSchema, and you want to tag the SYMBOL field with the stocks_buy tag.

First, you must get the fully qualified name of the field you want to tag because you will need this to specify the "entityName". You cannot get this from the Cloud Console. To get the fully qualified name of a field in a schema, send an API call that returns the details of that field.

For example, search for fields named SYMBOL:

curl --silent -u $APIKEY:$APISECRET \
--request GET \
--url '<SCHEMA-REGISTRY-URL>/catalog/v1/search/basic?type=sr_field&query=SYMBOL' | jq .

Your output will resemble the following. The fully qualified name for the SYMBOL field in the stocks_buy-value schema KsqlDataSourceSchema record is highlighted:

"searchParameters": {
    "includeDeleted": false,
    "limit": 0,
    "offset": 0
  },
  "types": [
    "sr_field"
  ],
  "entities": [
    {
      "typeName": "sr_field",
      "attributes": {
        "createTime": 1614710010055,
        "qualifiedName": "lsrc-g2p81:.:100001:ksql.StockTrade.symbol",
        "name": "symbol",
        "context": ".",
        "id": 100001,
        "nameLower": "symbol",
        "tenant": "lsrc-g2p81"
      },
      "guid": "446a0d3f-b77e-4ce5-a0d2-628076cd4a13",
      "status": "ACTIVE",
      "displayText": "symbol",
      "classificationNames": [],
      "meaningNames": [],
      "meanings": [],
      "isIncomplete": false,
      "labels": []
    },
    {
      "typeName": "sr_field",
      "attributes": {
        "createTime": 1614902466440,
        "qualifiedName": "lsrc-g2p81:.:100002:io.confluent.ksql.avro_schemas.KsqlDataSourceSchema.SYMBOL",
        "name": "SYMBOL",
        "context": ".",
        "id": 100002,
        "nameLower": "symbol",
        "tenant": "lsrc-g2p81"
      },
      "guid": "8cfacbc2-8ae8-44c0-b860-e3161bdb012a",
      "status": "ACTIVE",
      "displayText": "SYMBOL",
      "classificationNames": [],
      "meaningNames": [],
      "meanings": [],
      "isIncomplete": false,
      "labels": []
    },
    {
      "typeName": "sr_field",
      "attributes": {
        "createTime": 1614902469162,
        "qualifiedName": "lsrc-g2p81:.:100003:io.confluent.ksql.avro_schemas.KsqlDataSourceSchema.SYMBOL",
        "name": "SYMBOL",
        "context": ".",
        "id": 100003,
        "nameLower": "symbol",
        "tenant": "lsrc-g2p81"
      },
      "guid": "16854b0c-580b-408a-9cb6-49e5505900cc",
      "status": "ACTIVE",
      "displayText": "SYMBOL",
      "classificationNames": [
        "stocks_buy"
      ],
      "meaningNames": [],
      "meanings": [],
      "isIncomplete": false,
      "labels": []

Now tag the SYMBOL field with the stocks_buy tag.

curl --silent -u $APIKEY:$APISECRET \
--request POST \
--url $SCHEMA_REGISTRY_URL/catalog/v1/entity/tags \
--header 'Content-Type: application/json' \
--data '[ {  "entityType" : "sr_field",
  "entityName" : "lsrc-g2p81:.:100003:io.confluent.ksql.avro_schemas.KsqlDataSourceSchema.SYMBOL",
  "typeName" : "stocks_buy"} ]' | jq .

Your output should resemble:

"typeName": "stocks_buy",
"entityStatus": "ACTIVE",
"entityType": "sr_field",
"entityName": "lsrc-g2p81:.:100003:io.confluent.ksql.avro_schemas.KsqlDataSourceSchema.SYMBOL"

To verify on the Cloud Console, log on to https://confluent.cloud, navigate to the schema and field you just tagged, and you should see the stocks_buy tag associated with the SYMBOL field.

../_images/dg-catalog-api-add-tag-in-avro-ui.png

Try adding the same tag to the same schema on the PRICE field, substituting in PRICE instead of SYMBOL:

curl --silent -u $APIKEY:$APISECRET \
--request POST \
--url $SCHEMA_REGISTRY_URL/catalog/v1/entity/tags \
--header 'Content-Type: application/json' \
--data '[ {  "entityType" : "sr_field",
  "entityName" : "lsrc-g2p81:.:100003:io.confluent.ksql.avro_schemas.KsqlDataSourceSchema.PRICE",
  "typeName" : "stocks_buy"} ]' | jq .

Add another tag, my_stocks, to one of these same fields (SYMBOL or PRICE) either through the Cloud Console or through the API. If you want to use the API call, submit the curl command the same as above and just substitute my_stocks for stocks_buy.

curl --silent -u $APIKEY:$APISECRET \
--request POST \
--url $SCHEMA_REGISTRY_URL/catalog/v1/entity/tags \
--header 'Content-Type: application/json' \
--data '[ {  "entityType" : "sr_field",
  "entityName" : "lsrc-g2p81:.:100003:io.confluent.ksql.avro_schemas.KsqlDataSourceSchema.PRICE",
  "typeName" : "my_stocks"} ]' | jq .

Note that you could also tag a schema field with one of your tags set up to apply to cf_entity, such as myglobaltag (created as shown in Create a tag):

curl --silent -u $APIKEY:$APISECRET \
--request POST \
--url $SCHEMA_REGISTRY_URL/catalog/v1/entity/tags \
--header 'Content-Type: application/json' \
--data '[ {  "entityType" : "sr_field", "entityName" : "lsrc-g2p81:.:100003:io.confluent.ksql.avro_schemas.KsqlDataSourceSchema.PRICE",  "typeName" : "MyFaves"} ]' | jq .

The output should reflect that the MyFaves tag was added to the PRICE field, and as with all of these examples, you should also the tag on the schema field in the Confluent Cloud Console:

"typeName": "MyFaves",
"entityStatus": "ACTIVE",
"entityType": "sr_field",
"entityName": "lsrc-g2p81:.:100003:io.confluent.ksql.avro_schemas.KsqlDataSourceSchema.PRICE"
../_images/catalog-api-tag-examples.png

Get the tag attributes from a field

Get the tag attributes for the same field (SYMBOL in the stocks_buy-value schema in the record KsqlDataSourceSchema).

curl --silent -u $APIKEY:$APISECRET \
--request GET \
--url '<SCHEMA-REGISTRY-URL>/catalog/v1/entity/type/sr_field/name/100003:io.confluent.ksql.avro_schemas.KsqlDataSourceSchema.SYMBOL/tags' | jq .

Your output should resemble:

  "typeName": "my_stocks",
  "entityGuid": "cf62db9b-caec-40dd-928d-6dd98f803b4e",
  "entityStatus": "ACTIVE",
  "propagate": true,
  "removePropagationsOnEntityDelete": false,
  "entityType": "sr_field",
  "entityName": "lsrc-g2p81:.:100003:io.confluent.ksql.avro_schemas.KsqlDataSourceSchema.SYMBOL"
},
{
  "typeName": "stocks_buy",
  "entityGuid": "cf62db9b-caec-40dd-928d-6dd98f803b4e",
  "entityStatus": "ACTIVE",
  "propagate": true,
  "removePropagationsOnEntityDelete": false,
  "entityType": "sr_field",
  "entityName": "lsrc-g2p81:.:100003:io.confluent.ksql.avro_schemas.KsqlDataSourceSchema.SYMBOL"

Tag a schema version

First, create a new custom tag called favorites:

curl --silent -u $APIKEY:$APISECRET \
 --header 'Content-Type: application/json' \
 --data '[ { "entityTypes" : [ "sr_schema", "sr_record", "sr_field", "sr_schema" ],"name" : "favorites","description" : "my favorite stocks"} ]' \
 --url '<SCHEMA-REGISTRY-URL>/catalog/v1/types/tagdefs' | jq .

Your output should resemble:

"category": "CLASSIFICATION",
"createdBy": "root",
"updatedBy": "root",
"createTime": 1632974329626,
"updateTime": 1632974329626,
"version": 1,
"name": "favorites",
"description": "my favorite stocks",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"entityTypes": [
  "sr_field",
  "sr_schema",
  "sr_record",
  "sr_schema"

You must now get the fully-qualified name of the version of the stocks_buy-value schema you want to tag. You can get this name either by listing all schemas, or searching specifically for the schema by name.

To list all schemas:

curl --silent -u $APIKEY:$APISECRET --request GET --url $SCHEMA_REGISTRY_URL/catalog/v1/search/basic?types=sr_schema | jq .

Schemas with only one version will be listed only once. If a schema has multiple versions, a listing for each version is shown; the first part of the fully qualified name is the same for all versions with the second part of the name indicating the version. In this description of the stocks_buy-value schema: "lsrc-g2p81:.:100003", lsrc-g2p81 refers to all versions of the schema, and 100003 refers to a specific version.

Tag the current version of the stocks_buy-value schema you have been working on as favorites.

curl --silent -u $APIKEY:$APISECRET \
--request POST \
--url <SCHEMA-REGISTRY-URL>/catalog/v1/entity/tags \
--header 'Content-Type: application/json' \
--data '[ {  "entityType" : "sr_schema",  "entityName" : "lsrc-g2p81:.:100003",  "typeName" : "favorites"  } ]' | jq .

Your output will resemble:

"typeName": "favorites",
"entityStatus": "ACTIVE",
"entityType": "sr_schema",
"entityName": "lsrc-g2p81:.:100003"

Navigate to the stocks_buy-value schema on the Cloud Console to verify that favorites is associated with the version you tagged.

../_images/dg-catalog-api-view-tagged-schema-version.png

Get schemas with a given a subject name prefix

List all versions of all schemas with a subject name prefix of stocks.

curl --silent -u $APIKEY:$APISECRET \
--request GET \
--url '<SCHEMA-REGISTRY-URL>/catalog/v1/search/attribute?type=sr_schema&attrName=name&attrValuePrefix=stocks' | jq .

Your output should resemble:

  "searchParameters": {
  "includeDeleted": false,
  "limit": 0,
  "offset": 0
},
"types": [
  "sr_schema"
],
"entities": [
  {
    "typeName": "sr_schema",
    "attributes": {
      "createTime": 1614710010055,
      "qualifiedName": "lsrc-g2p81:.:stocks-value:1",
      "name": "stocks-value",
      "context": ".",
      "id": 100001,
      "nameLower": "stocks-value",
      "tenant": "lsrc-g2p81"
    },
    "guid": "ba600e7a-0093-41b3-8d68-38ba730323bc",
    "status": "ACTIVE",
    "displayText": "stocks-value",
    "classificationNames": [],
    "meaningNames": [],
    "meanings": [],
    "isIncomplete": false,
    "labels": []
  },
  {
    "typeName": "sr_schema",
    "attributes": {
      "createTime": 1614902357913,
      "qualifiedName": "lsrc-g2p81:.:stocks_under_100-value:1",
      "name": "stocks_under_100-value",
      "context": ".",
      "id": 100002,
      "nameLower": "stocks_under_100-value",
      "tenant": "lsrc-g2p81"
    },
    "guid": "7e2e652b-3b53-4461-b634-c8a58186b28d",
    "status": "ACTIVE",
    "displayText": "stocks_under_100-value",
    "classificationNames": [],
    "meaningNames": [],
    "meanings": [],
    "isIncomplete": false,
    "labels": []
  },
  {
    "typeName": "sr_schema",
    "attributes": {
      "createTime": 1614902369755,
      "qualifiedName": "lsrc-g2p81:.:stocks_under_100-value:2",
      "name": "stocks_under_100-value",
      "context": ".",
      "id": 100003,
      "nameLower": "stocks_under_100-value",
      "tenant": "lsrc-g2p81"
    },
    "guid": "799fde4c-a43b-4820-896c-e1f5fa2ba399",
    "status": "ACTIVE",
    "displayText": "stocks_under_100-value",
    "classificationNames": [],
    "meaningNames": [],
    "meanings": [],
    "isIncomplete": false,
    "labels": []
  },
  {
    "typeName": "sr_schema",
    "attributes": {
      "createTime": 1614902422050,
      "qualifiedName": "lsrc-g2p81:.:stocks_buy-value:2",
      "name": "stocks_buy-value",
      "context": ".",
      "id": 100003,
      "nameLower": "stocks_buy-value",
      "tenant": "lsrc-g2p81"
    },
    "guid": "59f3ced1-a303-4712-ae7a-f720b9261084",
    "status": "ACTIVE",
    "displayText": "stocks_buy-value",
    "classificationNames": [],
    "meaningNames": [],
    "meanings": [],
    "isIncomplete": false,
    "labels": []
  },

  ...

Delete a tag

curl --silent -u $APIKEY:$APISECRET -X DELETE /catalog/v1/types/tagdefs/<TAG_NAME> | jq .

No output will show on the command line, but if you check View & manage under Tags in the Confluent Cloud Console, you will see that the tag is no longer listed. Similarly, if you run the API call to list all tags, the tag you deleted will not be included in the output:

curl --silent -u $APIKEY:$APISECRET --request GET --url $SCHEMA_REGISTRY_URL/catalog/v1/types/tagdefs | jq .

Topics

Stream Catalog enables working with topics. Topics are searchable and tag-able like any other entity. In addition to the following API calls to list, search, and tag topics, see also Create a business metadata definition for a topic and Add business metadata to a topic.

Tip

Be sure to review the Setup and suggestions notes for these API usage examples before you begin.

List all topics
curl --silent -u $APIKEY:$APISECRET --request GET --url $SCHEMA_REGISTRY_URL/catalog/v1/search/basic?types=kafka_topic | jq .

Here is some example output, showing a partial list of topics; employees, stocks, and flights. Notice that this type of search (and several of the others that follow) is a way of getting the qualified name for an entity; in this case, the qualified name for a topic which shows as the value for qualifiedName.

"searchParameters": {
   "includeDeleted": false,
   "limit": 0,
   "offset": 0
 },
 "types": [
   "kafka_topic"
 ],
 "entities": [
   {
     "typeName": "kafka_topic",
     "attributes": {
       "createTime": 0,
       "qualifiedName": "lsrc-g2p81:lkc-d33jz:employees",
       "name": "employees",
       "nameLower": "employees",
       "tenant": "lsrc-g2p81"
     },
     "guid": "1e90c371-eae3-4bc2-be00-7e2de2d59ab5",
     "status": "ACTIVE",
     "displayText": "employees",
     "classificationNames": [],
     "meaningNames": [],
     "meanings": [],
     "isIncomplete": false,
     "labels": []
   },

   {
     "typeName": "kafka_topic",
     "attributes": {
       "createTime": 0,
       "qualifiedName": "lsrc-g2p81:lkc-d33jz:stocks",
       "name": "stocks",
       "nameLower": "stocks",
       "tenant": "lsrc-g2p81"
     },
     "guid": "d2477133-2d57-47a0-b663-4d683e8c7c33",
     "status": "ACTIVE",
     "displayText": "stocks",
     "classificationNames": [],
     "meaningNames": [],
     "meanings": [],
     "isIncomplete": false,
     "labels": []

     {
        "typeName": "kafka_topic",
        "attributes": {
          "createTime": 1676419125000,
          "qualifiedName": "lsrc-g2p81:lkc-xq8k7g:flights",
          "name": "flights",
          "nameLower": "flights",
          "tenant": "lsrc-g2p81"
        },
        "guid": "84db47d2-b526-44b1-ba34-8b67cd72082f",
        "status": "ACTIVE",
        "displayText": "flights",
        "classificationNames": [],
        "meaningNames": [],
        "meanings": [],
        "isIncomplete": false,
        "labels": []

        ...
Search for topics by name
curl --silent -u $APIKEY:$APISECRET --request GET --url '<SCHEMA-REGISTRY-URL>/catalog/v1/search/basic?types=kafka_topic&query=<TOPIC-NAME>' | jq .

The following shows output for a search on a topic called my-flights:

"searchParameters": {
  "includeDeleted": false,
  "limit": 0,
  "offset": 0
},
"types": [
  "kafka_topic"
],
"entities": [
  {
    "typeName": "kafka_topic",
    "attributes": {
      "createTime": 1677786067000,
      "qualifiedName": "lsrc-g2p81:lkc-xq8k7g:my-flights",
      "name": "my-flights",
      "nameLower": "my-flights",
      "tenant": "lsrc-g2p81"
    },
    "guid": "6bff8605-e6ec-46b2-8626-b32bef8e2185",
    "status": "ACTIVE",
    "displayText": "my-flights",
    "classificationNames": [
      "myglobaltag",
      "myfavestuff"
    ],
    "meaningNames": [],
    "meanings": [],
    "isIncomplete": false,
    "labels": []
  }

Note that the qualifiedName is highlighted in the above example. You can use the value of this in other calls which require the fully qualified name for a topic.

Search for topics by tag
curl --silent -u $APIKEY:$APISECRET --request GET --url '<SCHEMA-REGISTRY-URL>/catalog/v1/search/basic?type=kafka_topic&tag=<TAG_NAME>'| jq .

For example, to search for topics tagged with MyFaves:

curl --silent -u $APIKEY:$APISECRET --request GET --url '<SCHEMA-REGISTRY-URL>/catalog/v1/search/basic?type=kafka_topic&tag=MyFaves'| jq .
Tag a topic

To tag a topic or any other entity type, you must first know:

  • Name of the tag you want to use. (The tag must be generic; that is, applicable to cf_entity. All tags created from the Confluent Cloud Console are generic tags. To learn more see Create a tag.)
  • Fully qualified name of the topic you want to apply the tag to.
  • Entity type to use. In the case of topic, the entity type is kafka_topic. (Other tags covered in subsequent examples can be created to apply only to schema related entities such as sr_schema, sr_field, sr_record, and so on.)

The steps are:

  1. Find a tag either by looking on the Confluent Cloud Console as described in View available tags or by running the API call to list all tags. (To be applicable to a topic, the tag must be created to apply to cf_entity.)
  2. Get the fully qualified name of the topic to which you want to apply the tag, using the call for either List all topics or Search for topics by name.
  3. Run the API call to tag the topic, as shown below.
curl --silent -u $APIKEY:$APISECRET --request POST --url $SCHEMA_REGISTRY_URL/catalog/v1/entity/tags \
--header 'Content-Type: application/json' \
--data '[ {  "entityType" : "kafka_topic",  "entityName" : "<QUALIFIED_NAME_FOR_ENTITY>", "typeName" : "<TAG_NAME>"} ]' | jq .

Here is an example of adding a tag called myglobaltag to the my-flights topic.

curl --silent -u $APIKEY:$APISECRET --request POST --url $SCHEMA_REGISTRY_URL/catalog/v1/entity/tags \
--header 'Content-Type: application/json' \
--data '[ {  "entityType" : "kafka_topic",  "entityName" : "lsrc-g2p81:lkc-xq8k7g:my-flights", "typeName" : "myglobaltag"} ]' | jq .
Add a topic owner and email

To add an owner and owner email for a topic, you can use the /catalog/v1/entity endpoint. Note that this differs from the business metadata definitions for topics which use /catalog/v1/entity/businessmetadata to specify custom-created business metadata such as team names, Slack channels, and emails.

To add an owner and owner email for a topic:

  1. Create a data definition file that identifies the Kafka topic you want to update, and provides values for the owner and owner email attributes. You can get the qualified name for the topic by listing all topics.

    For example, the following file called topic-attributes.txt will update the topic stocks with the specified owner and owner email:

    {
      "entity": {
        "typeName": "kafka_topic",
        "attributes": {
          "owner": "Sam Stevens",
          "ownerEmail": "sam@magic.org",
          "qualifiedName": "lkc-xq8k7g:stocks"
        }
      }
    }
    
  2. Use the following API call to apply the attributes. Note that you will use PUT (rather than POST) because you are updating an existing resource. Even if they do not contain values, the owner and ownerEmail attributes exist by default.

    curl --silent -u  $APIKEY:$APISECRET  -X PUT -H "Content-Type: application/json" --data @<path-to-data-file> $SCHEMA_REGISTRY_URL/catalog/v1/entity | jq .
    

    Here is an example of applying the attributes in the topic-attributes file and the data path filled in:

    curl --silent -u  $APIKEY:$APISECRET  -X PUT -H "Content-Type: application/json" --data @/Users/vicky/topic-attributes.txt $SCHEMA_REGISTRY_URL/catalog/v1/entity | jq .
    

    The output includes the updated attribute values:

    "mutatedEntities": {
      "UPDATE": [
        {
          "typeName": "kafka_topic",
          "attributes": {
            "replicationFactor": 3,
            "maxCompactionLagMs": 9223372036854776000,
            "compressionType": "PRODUCER",
            "messageDownconversionEnable": true,
            "minCleanableDirtyRatio": 0.5,
            "uncleanLeaderElectionEnable": false,
            "minInsyncReplicas": 2,
            "retentionBytes": -1,
            "followerReplicationThrottledReplicas": null,
            "id": "ZlrtQVsEQ-6VZmedr-AQXQ",
            "tenant": "lsrc-g2p81",
            "externalSourceTopicName": null,
            "__ss_orgEmail": null,
            "fileDeleteDelayMs": 60000,
            "qualifiedName": "lsrc-g2p81:lkc-xq8k7g:stocks",
            "leaderReplicationThrottledReplicas": null,
            "messageTimestampType": "CreateTime",
            "preallocate": false,
            "isInternal": false,
            "segmentBytes": 104857600,
            "segmentIndexBytes": 10485760,
            "flushMessages": 9223372036854776000,
            "name": "stocks",
            "doc": null,
            "messageFormatVersion": "3.0-IV1",
            "indexIntervalBytes": 4096,
            "minCompactionLagMs": 0,
            "segmentMs": 604800000,
            "mirrorTopicState": null,
            "segmentJitterMs": 0,
            "replicatedFrom": null,
            "maxMessageBytes": 2097164,
            "displayName": null,
            "description": "Stocks base topic",
            "nameLower": "stocks",
            "valueSchemaValidation": false,
            "deprecatedTime": null,
            "externalSourceTopicId": null,
            "owner": "Sam Stevens",
            "retentionMs": 604800000,
            "replicatedTo": null,
            "userDescription": null,
            "deleteRetentionMs": 86400000,
            "flushMs": 9223372036854776000,
            "updateTime": 1695180837000,
            "__ss_topicDisplayName": null,
            "mirrorTopicUpdateTime": null,
            "__ss_orgDescription": null,
            "ownerEmail": "sam@magic.org",
            "partitionsCount": 6,
            "__ss_orgLogo": null,
            "messageTimestampDifferenceMaxMs": 9223372036854776000,
            "createTime": 0,
            "keySchemaValidation": false,
            "cleanupPolicy": "DELETE"
          },
          "guid": "ecbc2033-afa0-4413-becc-da8eb7700ca8",
          "status": "ACTIVE",
          "classificationNames": [
            "lsrc_g2p81__myfavestuff",
            "lsrc_g2p81__my_stocks",
            "lsrc_g2p81__Sensitive",
            "lsrc_g2p81__ygu_test"
          ],
          "classifications": [
            {
              "typeName": "lsrc_g2p81__myfavestuff",
              "entityGuid": "ecbc2033-afa0-4413-becc-da8eb7700ca8",
              "entityStatus": "ACTIVE",
              "propagate": false,
              "removePropagationsOnEntityDelete": false
            },
            {
              "typeName": "lsrc_g2p81__my_stocks",
              "entityGuid": "ecbc2033-afa0-4413-becc-da8eb7700ca8",
              "entityStatus": "ACTIVE",
              "propagate": false,
              "removePropagationsOnEntityDelete": false
            },
            {
              "typeName": "lsrc_g2p81__Sensitive",
              "entityGuid": "ecbc2033-afa0-4413-becc-da8eb7700ca8",
              "entityStatus": "ACTIVE",
              "propagate": false,
              "removePropagationsOnEntityDelete": false
            },
            {
              "typeName": "lsrc_g2p81__ygu_test",
              "entityGuid": "ecbc2033-afa0-4413-becc-da8eb7700ca8",
              "entityStatus": "ACTIVE",
              "propagate": false,
              "removePropagationsOnEntityDelete": false
            }
          ],
          "isIncomplete": false
    
  3. Log on to https://confluent.cloud and check the Cloud Console to verify that the topic reflects the updates. Navigate to the environment and cluster in which you are working, click Topics on the left panel, then click the topic (in this example, stocks).

    ../_images/dg-topic-email-and-owner.png
Create a tag for a topic

Tags that can be applied to a topic must be created to specify cf_entity in the data block. To learn how to create such a tag, see Create a generic tag applicable to a topic or any entity in the section on Tags API examples.

Connectors

Similar to working with topics and schemas, you can list, search, tag, and apply business metadata to connectors by using the connector entity name (cn_connector), and/or specifying other categories on the connector. Following are a couple of examples.

List all connectors
curl --silent -u $APIKEY:$APISECRET --request GET --url $SCHEMA_REGISTRY_URL/catalog/v1/search/basic?types=cn_connector | jq .

Here is some example output, showing the list of connectors in this environment; DatagenSourceConnector_0 and StockSource Connector. Notice that this type of search is a way of getting the qualified name for an entity; in this case, the qualified name for each connector.

 "searchParameters": {
   "includeDeleted": false,
   "limit": 0,
   "offset": 0
 },
 "types": [
   "cn_connector"
 ],
 "entities": [
   {
     "typeName": "cn_connector",
     "attributes": {
       "createTime": 1618274263000,
       "qualifiedName": "lsrc-g2p81:lcc-qnndm",
       "name": "DatagenSourceConnector_0",
       "nameLower": "datagensourceconnector_0",
       "tenant": "lsrc-g2p81"
     },
     "guid": "c754c38c-94a0-4107-a3ec-4a001e6d3e50",
     "status": "ACTIVE",
     "displayText": "DatagenSourceConnector_0",
     "classificationNames": [],
     "meaningNames": [],
     "meanings": [],
     "isIncomplete": false,
     "labels": []
   },
   {
     "typeName": "cn_connector",
     "attributes": {
       "createTime": 1663712312000,
       "qualifiedName": "lsrc-g2p81:lcc-w7857j",
       "name": "StockSource Connector",
       "nameLower": "stocksource connector",
       "tenant": "lsrc-g2p81"
     },
     "guid": "d14e6855-9186-43a8-8902-736f0de23c00",
     "status": "ACTIVE",
     "displayText": "StockSource Connector",
     "classificationNames": [],
     "meaningNames": [],
     "meanings": [],
     "isIncomplete": false,
     "labels": []
Search for connectors by name
curl --silent -u $APIKEY:$APISECRET --request GET --url '<SCHEMA-REGISTRY-URL>/catalog/v1/search/basic?types=cn_connector&query=<CONNECTOR-NAME>' | jq .

The following shows output for a search on the connector DatagenSourceConnector_0:

"searchParameters": {
  "includeDeleted": false,
  "limit": 0,
  "offset": 0
},
"types": [
  "cn_connector"
],
"entities": [
  {
    "typeName": "cn_connector",
    "attributes": {
      "createTime": 1618274263000,
      "qualifiedName": "lsrc-g2p81:lcc-qnndm",
      "name": "DatagenSourceConnector_0",
      "nameLower": "datagensourceconnector_0",
      "tenant": "lsrc-g2p81"
    },
    "guid": "84d54699-c7f2-4541-a9e9-6adb86a8e6fe",
    "status": "ACTIVE",
    "displayText": "DatagenSourceConnector_0",
    "classificationNames": [],
    "meaningNames": [],
    "meanings": [],
    "isIncomplete": false,
    "labels": []

If the entity name has a space in it, use %20 to indicate the space. For example, to search for the StockSource Connector:

curl --silent -u $APIKEY:$APISECRET --request GET --url '<SCHEMA-REGISTRY-URL>/catalog/v1/search/basic?types=cn_connector&query=StockSource%20Connector' | jq .

"searchParameters": {
  "includeDeleted": false,
  "limit": 0,
  "offset": 0
},
"types": [
  "cn_connector"
],
"entities": [
  {
    "typeName": "cn_connector",
    "attributes": {
      "createTime": 1663712312000,
      "qualifiedName": "lsrc-g2p81:lcc-w7857j",
      "name": "StockSource Connector",
      "nameLower": "stocksource connector",
      "tenant": "lsrc-g2p81"
    },
    "guid": "2328962b-9f09-4261-b2b6-72684baaaf32",
    "status": "ACTIVE",
    "displayText": "StockSource Connector",
    "classificationNames": [
      "my_cool_tag"
    ],
    "meaningNames": [],
    "meanings": [],
    "isIncomplete": false,
    "labels": []
Tag a connector

To tag a connector, you must first know:

  • Name of the tag you want to use. (The tag must be generic; that is, applicable to cf_entity. All tags created from the Confluent Cloud Console are generic tags. To learn more see Create a tag.)
  • Fully qualified name of the topic you want to apply the tag to.
  • Entity type to use. In the case of topic, the entity type is connector_cn.

For more detail, see the introduction to Tag a topic, which follows the same pattern.

Here is an example of adding a tag called my_cool_tag to StockSource Connector, which has a fully qualified name of lsrc-g2p81:lcc-w7857j . (The fully qualified name for this connector was found by listing all connectors.)

curl --silent -u $APIKEY:$APISECRET --request POST --url $SCHEMA_REGISTRY_URL/catalog/v1/entity/tags --header 'Content-Type: application/json' \
--data '[ {  "entityType" :  "cn_connector" , "entityName" :  "lsrc-g2p81:lcc-w7857j", "typeName" : "MyFaves"} ]' | jq .

"typeName": "my_cool_tag",
"entityStatus": "ACTIVE",
"entityType": "cn_connector",
"entityName": "lsrc-g2p81:lcc-w7857j"

Business metadata API examples

Stream Catalog allows you to define a number of tag definitions and apply the tags to entities. Support for business metadata enriches the management and search of entities. Business metadata gives you a way to attach key-value pairs to specific entities. Start by creating a business metadata definition, which is a “group” of attributes. Business metadata definitions can be queried, updated, or deleted. Then you can apply the attributes (or a subset of attributes, if some are marked optional) within the business metadata definition to entities. You can further query all the key-value pairs attached to specific entities, update them, or delete them.

Business metadata concepts are fully covered in the Confluent Cloud Console user guide under Business metadata.

Following are usage examples for working with business metadata through the Confluent Cloud CATALOG API (V1).

The examples include creating a hamburgers schema and a business metadata definition called team with several attributes, attaching specific team metadata to hamburgers, and then updating the values for the attributes. Try tracking the changes you make on the Confluent Cloud Console to validate the API output, as suggested in the examples. Some of the examples also include working with topic metadata.

Keep in mind, you can apply and work with business metadata on any of the supported entity types listed at the top of this page. These examples showing use cases for some of the entity types are meant to help get you started.

Tip

  • Be sure to review the Setup and suggestions notes for these API usage examples before you begin.
  • Refer, as needed, to the list of Entity types shown at the beginning of this document.

Create a schema

To start with, create a new schema. In later steps, you will create business metadata and apply it to this new schema.

Here is an example of a schema file called hamburgers.json, properly formatted for the API call to the catalog.

{
      "schema":
         "{
                \"type\": \"record\",
                \"name\": \"Hamburger\",
                \"fields\":
                  [
                    {
                      \"type\": \"string\",
                      \"name\": \"Sandwich\"
                    },
                    {
                      \"type\": \"string\",
                      \"name\": \"Size\"
                    }
                  ]
              }"
    }

The API call to create the schema is as follows:

curl --silent -u $APIKEY:$APISECRET -X POST -H "Content-Type: application/json" \
--data @/path/to/your/schema <SCHEMA-REGISTRY-URL>/subjects/<subject-name>/versions

For the example, the API call to create the “hamburgers” schema would look similar to this:

curl --silent -u $APIKEY:$APISECRET -X POST -H "Content-Type: application/json" \
--data @/Users/vicky/hamburgers.json /subjects/Hamburgers/versions | jq .

After you have created the schema, verify that it is available on the Confluent Cloud Console (https://confluent.cloud/). Navigate to your environment, and click Schemas on the right side panel to see the hamburgers schema in the list.

Create your first business metadata definition

Here is an example business metadata definition called team, with two fields, teamName and slackChannel. You can provide this definition in a file called team.txt, and pass the file in the API call.

[ {
     "name" : "team",
     "description" : "Team information as business metadata",
     "attributeDefs" : [
     { "name" : "teamName", "isOptional" : "true", "cardinality" : "SINGLE", "typeName" : "string", "options": {"maxStrLength": 25, "applicableEntityTypes": "[\"sr_schema\"]"} },
     { "name" : "slackChannel", "isOptional" : "true", "cardinality" : "SINGLE", "typeName" : "string", "options": {"maxStrLength": 30, "applicableEntityTypes": "[\"sr_schema\"]"} }
     ]
   } ]

Here is an example of a POST request to add the above definition to the catalog for a Schema Registry. Sending this POST request is analogous to adding a new definition through the Confluent Cloud Console.

curl --silent -u $APIKEY:$APISECRET  -X POST -H "Content-Type: application/json" \
--data @/Users/vicky/team.txt $SCHEMA_REGISTRY_URL/catalog/v1/types/businessmetadatadefs | jq .
  • The --data flag provides the path to the file. In this example the data path is Users/vicky/team.txt. You must include the @ sign before the path with no spaces in between.
  • The only required field (from the request perspective) is name. If description not specified, it defaults to the same as name.
  • The attributeDefs field is optional. It can be updated later, but you can then only add optional attribute definitions (isOptional field set to true).
  • For each attribute definition in attributeDefs, the required fields (from the request perspective) include name and typeName.
    • name is the unique identifier of an attribute within one business metadata definition (each attribute is accessed in the form of <business metadata def>.<attribute name>).
    • typeName is the type of the attribute, including builtin types such as long, float, boolean, string, and more.
  • Business metadata requires that each attribute have applicableEntityTypes as a key in the options field. applicableEntityTypes is a stringified JSON that is a list of all applicable entities of this attribute.
  • You can specify cf_entity as the applicable entity type, to specify that the business metadata definition could apply to any type of entities you define.
  • If typeName of an attribute is string, options must have maxStrLength specified. For other builtin types, there is no such constraint.

If you used the curl --silent flag and piped the API call through jq (like the example), you will get human-readable output as shown below.

{
    "category": "BUSINESS_METADATA",
    "createdBy": "root",
    "updatedBy": "root",
    "createTime": 1656010917403,
    "updateTime": 1656010917403,
    "version": 1,
    "name": "team",
    "description": "Team information as business metadata",
    "typeVersion": "1.0",
    "attributeDefs": [
      {
        "name": "teamName",
        "typeName": "string",
        "isOptional": true,
        "cardinality": "SINGLE",
        "valuesMinCount": 0,
        "valuesMaxCount": 1,
        "isUnique": false,
        "isIndexable": false,
        "includeInNotification": false,
        "searchWeight": -1,
        "options": {
          "applicableEntityTypes": "[\"sr_schema\"]",
          "maxStrLength": "25"
        }
      },
      {
        "name": "slackChannel",
        "typeName": "string",
        "isOptional": true,
        "cardinality": "SINGLE",
        "valuesMinCount": 0,
        "valuesMaxCount": 1,
        "isUnique": false,
        "isIndexable": false,
        "includeInNotification": false,
        "searchWeight": -1,
        "options": {
          "applicableEntityTypes": "[\"sr_schema\"]",
          "maxStrLength": "30"
        }

After creating the new business metadata definition, verify that it is available on the Confluent Cloud Console. Navigate to View environments <your-environment> > Schema Registry tab > View & manage business metadata > team to see the new team definition.

../_images/dg-biz-metadata-api-verify-ui-bm-created.png

Create a business metadata definition for a topic

To create business metadata that is applicable to a topic, specify the kafka_topic for applicableEntityTypes. Here is an example of a business metadata definition called MyNewTeam, with three fields, teamName, slackChannel, and email.

[ {
     "name" : "MyNewTeam",
     "description" : "Team information as business metadata",
     "attributeDefs" : [
     { "name" : "teamName", "isOptional" : "true", "cardinality" : "SINGLE", "typeName" : "string", "options": {"maxStrLength": 25, "applicableEntityTypes": "[\"cf_entity\"]"} },
     { "name" : "slackChannel", "isOptional" : "true", "cardinality" : "SINGLE", "typeName" : "string", "options": {"maxStrLength": 30, "applicableEntityTypes": "[\"cf_entity\"]"} },
     { "name" : "email", "isOptional" : "true", "cardinality" : "SINGLE", "typeName" : "string", "options": {"maxStrLength": 30, "applicableEntityTypes": "[\"cf_entity\"]"} }
     ]
   } ]

Here is an example of a POST request to add the above definition to the catalog, using a data file called newteam.txt.

curl --silent -u $APIKEY:$APISECRET  -X POST -H "Content-Type: application/json" \
--data @/Users/vicky/newteam.txt $SCHEMA_REGISTRY_URL/catalog/v1/types/businessmetadatadefs | jq .

If you used the curl --silent flag and piped the API call through jq (like the example), you will get human-readable output as shown below.

{
  "category": "BUSINESS_METADATA",
  "createdBy": "root",
  "updatedBy": "root",
  "createTime": 1677873857563,
  "updateTime": 1677873857563,
  "version": 1,
  "name": "MyNewTeam",
  "description": "Team information as business metadata",
  "typeVersion": "1.0",
  "attributeDefs": [
    {
      "name": "teamName",
      "typeName": "string",
      "isOptional": true,
      "cardinality": "SINGLE",
      "valuesMinCount": 0,
      "valuesMaxCount": 1,
      "isUnique": false,
      "isIndexable": false,
      "includeInNotification": false,
      "searchWeight": -1,
      "options": {
        "applicableEntityTypes": "[\"kafka_topic\"]",
        "maxStrLength": "25"
      }
    },
    {
      "name": "slackChannel",
      "typeName": "string",
      "isOptional": true,
      "cardinality": "SINGLE",
      "valuesMinCount": 0,
      "valuesMaxCount": 1,
      "isUnique": false,
      "isIndexable": false,
      "includeInNotification": false,
      "searchWeight": -1,
      "options": {
        "applicableEntityTypes": "[\"kafka_topic\"]",
        "maxStrLength": "30"
      }
    },
    {
      "name": "email",
      "typeName": "string",
      "isOptional": true,
      "cardinality": "SINGLE",
      "valuesMinCount": 0,
      "valuesMaxCount": 1,
      "isUnique": false,
      "isIndexable": false,
      "includeInNotification": false,
      "searchWeight": -1,
      "options": {
        "applicableEntityTypes": "[\"kafka_topic\"]",
        "maxStrLength": "30"
      }

To learn how to apply this definition to a topic, see Add business metadata to a topic.

Get all the business metadata definitions created so far

The following call retrieves all business metadata definitions on the target registry catalog. Adding the curl --silent flag and piping through jq provides human-readable output.

curl --silent -u $APIKEY:$APISECRET -X GET \
$SCHEMA_REGISTRY_URL/catalog/v1/types/businessmetadatadefs | jq .

Get a specific business metadata definition by its name

This call gets the specified definition by name; in this case, team.

curl --silent -u $APIKEY:$APISECRET -X GET \
$SCHEMA_REGISTRY_URL/catalog/v1/types/businessmetadatadefs/team | jq .

Update business metadata definitions

To update a definition, use the PUT, and pass in the updated file.

curl --silent -u $APIKEY:$APISECRET -X PUT -H "Content-Type: application/json" \
--data --data @/Users/vicky/team.txt $SCHEMA_REGISTRY_URL/catalog/v1/types/businessmetadatadefs
  • The contents of team.txt follows the same format as that used during creation. You pass a list of business metadata definitions to PUT.
  • As mentioned above, you can only add optional attribute definitions with PUT.
  • Attributes cannot be deleted once they are added to business metadata definitions.
  • Some fields for attributes (such as description) can be updated, while others (such as typeName) cannot be updated.

Here is an example of the team.txt file, updated to add an email field.

[ {
     "name" : "team",
     "description" : "Team information as business metadata",
     "attributeDefs" : [
     { "name" : "teamName", "isOptional" : "true", "cardinality" : "SINGLE", "typeName" : "string", "options": {"maxStrLength": 25, "applicableEntityTypes": "[\"sr_schema\"]"} },
     { "name" : "slackChannel", "isOptional" : "true", "cardinality" : "SINGLE", "typeName" : "string", "options": {"maxStrLength": 30, "applicableEntityTypes": "[\"sr_schema\"]"} },
     { "name" : "email", "isOptional" : "true", "cardinality" : "SINGLE", "typeName" : "string", "options": {"maxStrLength": 30, "applicableEntityTypes": "[\"sr_schema\"]"} }
     ]
   } ]

Add business metadata to a topic

You can use the /catalog/v1/entity/businessmetadata endpoint to add values for your custom-created attributes on a topic. Note that this differs from using the /catalog/v1/entity endpoint to add values for the ready-made topic attributes topic owner and ownerEmail, as described in Add a topic owner and email.

To add business metadata to a topic:

Here is an example of a data file called newteam-data.txt which specifies data to add to the my-flights topic:

[ {
     "entityType": "kafka_topic",
     "entityName": "lsrc-g2p81:lkc-xq8k7g:my-flights",
     "typeName": "MyNewTeam",
     "attributes": {"teamName": "MyTeam", "slackChannel": "MySlack", "email": "myteam@coolideas.org"}
   } ]

Use the following API call to apply the metadata to the topic. Use POST when you are populating these values for the first time. (You would use PUT to update existing values.)

curl --silent -u $APIKEY:$APISECRET  -X POST -H "Content-Type: application/json" \
--data @/Users/vicky/newteam-data.txt $SCHEMA_REGISTRY_URL/catalog/v1/entity/businessmetadata | jq .

Your output should resemble:

{
   "typeName": "MyNewTeam",
   "attributes": {
     "teamName": "MyNewTeam",
     "slackChannel": "MySlack",
     "email": "mynewteam@coolideas.org"
   },
   "entityType": "kafka_topic",
   "entityName": "lsrc-g2p81:lkc-xq8k7g:my-flights"
 }

Add business metadata to a connector

To add business metadata to a connector:

Here is an example of a data file called newteam-data.txt which specifies data to add to the StockSource Connector (referred to by the qualified name):

[ {
     "entityType": "connector_cn",
     "entityName": "lsrc-g2p81:lcc-w7857",
     "typeName": "MyNewTeam",
     "attributes": {"teamName": "MyTeam", "slackChannel": "MySlack", "email": "myteam@coolideas.org"}
   } ]

Use the following API call to apply the metadata to the connector.

curl --silent -u $APIKEY:$APISECRET  -X POST -H "Content-Type: application/json" \
--data @/Users/vicky/newteam-data.txt $SCHEMA_REGISTRY_URL/catalog/v1/entity/businessmetadata | jq .

Your output should resemble:

{
   "typeName": "MyNewTeam",
   "attributes": {
     "teamName": "MyNewTeam",
     "slackChannel": "MySlack",
     "email": "mynewteam@coolideas.org"
   },
   "entityType": "connector_cn",
   "entityName": "lsrc-g2p81:lcc-w7857"
 }

Get business metadata associated with an instance of an entity

To retrieve the business metadata associated with a specific instance of an entity:

curl --silent -u $APIKEY:$APISECRET -X GET \
$SCHEMA_REGISTRY_URL/catalog/v1/entity/type/<entityType>/<value-identifying-the-instance-of-the-entity>/businessmetadata | jq .

For example, the following API call which specifies the schema entity type and schema ID for the hamburgers schema will return the same attribute details as the previous call, where you applied that metadata.

curl --silent -u $APIKEY:$APISECRET -X GET $SCHEMA_REGISTRY_URL/catalog/v1/entity/type/sr_schema/name/100012/businessmetadata | jq .
"typeName": "team",
  "attributes": {
    "teamName": "Design",
    "slackChannel": "Design-all",
    "email": "design@coolideas.org"
  },
  "entityType": "sr_schema",
  "entityName": "lsrc-g2p81:.:100012"

Update business metadata associated with entity

Use a PUT request with new data to update business metadata associated with an entity.

For example, use this modified version of the team-data.txt to update values for teamName and email:

[ {
    "entityType": "sr_schema",
    "entityName": "100012",
    "typeName": "team",
    "attributes": {"teamName": "DesignSuperStars", "slackChannel": "Design-all", "email": "design@coolestideas.org"}
  } ]

Then pass the data file as a PUT request.

curl -- silent -u $APIKEY:$APISECRET -X PUT -H "Content-Type: application/json" \
--data @/Users/vicky/team-data.txt $SCHEMA_REGISTRY_URL/catalog/v1/entity/businessmetadata | jq .

Your output should resemble:

"typeName": "team",
"attributes": {
  "teamName": "DesignSuperStars",
  "slackChannel": "Design-all",
  "email": "design@coolestideas.org"
},
"entityType": "sr_schema",
"entityName": "lsrc-g2p81:.:100012",
"error": {
  "error_code": 4000151,
  "message": "Business-metadata attribute already exists in entity: lsrc-g2p81:.:100012"

Updating only a subset of fields will not delete other fields.

Note that this is analogous to editing these values on the Confluent Cloud Console.

The path on the UI would be: View environments > <your-environment> > Schemas > hamburgers, then click the context-sensitive edit button for the team metadata on the right panel.

../_images/dg-biz-metadata-api-verify-ui-bm-start-edit.png

The dialog to edit those same values from the UI looks like this.

../_images/dg-biz-metadata-api-verify-ui-bm-edit.png

Search for business metadata associated with entity

To search for business metadata on a particular entity, use the following API call to specify an entity type and value in a field.

curl --silent -u $APIKEY:$APISECRET -X GET \
"<SCHEMA-REGISTRY-URL>/catalog/v1/search/attribute?type=<entityType>&attrName=<business-metadata-definition>.<field-name>&attrValuePrefix=<value-for-a-field>" | jq .
  • Currently, only prefix search on a string attributes is supported. (This may be extended to multiple data types with more search semantics in the future).
  • Attributes in a business metadata definition are specified as <business-metadata-definition>.<attribute-name>; the attribute name is the field name.

For example, you might want to search for the team name, Slack channel, or email address associated with a team. Here are examples of all of those searches on the updated team metadata associated with the hamburgers schema:

curl --silent -u $APIKEY:$APISECRET -X GET \
"<SCHEMA-REGISTRY-URL>/catalog/v1/search/attribute?type=sr_schema&attrName=team.teamName&attrValuePrefix=DesignSuperStars" | jq .
curl --silent -u $APIKEY:$APISECRET -X GET \
"<SCHEMA-REGISTRY-URL>/catalog/v1/search/attribute?type=sr_schema&attrName=team.slackChannel&attrValuePrefix=Design-all" | jq .
curl --silent -u $APIKEY:$APISECRET -X GET \
"<SCHEMA-REGISTRY-URL>/catalog/v1/search/attribute?type=sr_schema&attrName=team.email&attrValuePrefix=design@coolestideas.org" | jq .

Here is an example of the output for the last search, on the team.email associated with hamburgers:

"searchParameters": {
    "includeDeleted": false,
    "limit": 0,
    "offset": 0
  },
  "types": [
    "sr_schema"
  ],
  "entities": [
    {
      "typeName": "sr_schema",
      "attributes": {
        "createTime": 1655158742047,
        "qualifiedName": "lsrc-g2p81:.:100012",
        "name": "Hamburger",
        "context": ".",
        "nameLower": "hamburger",
        "id": 100012,
        "tenant": "lsrc-g2p81",
        "lsrc_g2p81__team.email": "design@coolestideas.org"
      },
      "guid": "a36c35c7-86b1-4f7d-a6f7-e3b86efe7928",
      "status": "ACTIVE",
      "displayText": "Hamburger",
      "classificationNames": [],
      "meaningNames": [],
      "meanings": [],
      "isIncomplete": false,
      "labels": []
    }

Remove business metadata associated with an entity

For now, you can only delete all attributes at once from the business metadata definition associated with an entity.

For example, the following API call removes the team metadata definition and associated attributes from the hamburgers schema.

curl --silent -u $APIKEY:$APISECRET -X DELETE $SCHEMA_REGISTRY_URL/catalog/v1/entity/type/sr_schema/name/100012/businessmetadata/team | jq .

Note that this does not delete the business metadata definition from the catalog; it only removes the particular definition from the entity (schema, in this case).

Delete business metadata definitions

To entirely delete a business metadata definition (for example, team) from the Schema Registry catalog:

curl --silent -u $APIKEY:$APISECRET -X DELETE $SCHEMA_REGISTRY_URL/catalog/v1/types/businessmetadatadefs/team | jq .