Use AsyncAPI to Describe Topics and Schemas on Confluent Cloud Clusters¶
This page describes the AsyncAPI, what it is and how it works, and provides walkthroughs to both generate (export) an AsyncAPI specification from an existing cluster and reverse tooling an AsyncAPI spec (import) into an environment to configure your Confluent Cloud resources on-the-fly.
What is AsyncAPI¶
AsyncAPI is an open source machine-readable specification language for message-oriented APIs and architectures. It is not an API itself, nor does it act as a code abstraction layer for making calls to messaging systems itself. AsyncAPI is a metadata description language. It is used to generate descriptions of resources like topics and queues, events, and producers and consumers, in YAML.
AsyncAPI is heavily influenced by the OpenAPI, formerly Swagger, which is an HTTP API specification language that shares many of the same objectives. OpenAPI describes and documents how a REST API operates in a standard way so that it can be shared widely. AsyncAPI is “OpenAPI for the asynchronous world”.
However, AsyncAPI goes beyond the OpenAPI specification by allowing users to describe the applications and their send and receive operations against streams, broadening their usage to that of a streaming architecture specification. Plus, a growing and vast ecosystem of AsyncAPI tools exists, including code generators, validators, parsers, and so on, which can be used to speed up those tasks.
AsyncAPI with Confluent¶
Confluent uses the Schema Registry to define schemas for messages sent over Apache Kafka®. A customer has a Kafka cluster which can contain multiple topics (channels). Users can produce (publish) or consume (subscribe) messages to and from these topics. Each topic can also contain an associated schema, managed by Schema Registry, which is used to define the structure of messages of that topic. Applications talk to two brokers (servers) while sending the message, one is the Kafka cluster and the other is the Schema Registry Cluster. Confluent Kafka topics have metadata like cleanup policy, delete retention time, and so on, as well as tags at the topic level and schema level. AsyncAPI Specification is a way to capture the entire event-streaming architecture in a single document and provide the users a concise screenshot of their Confluent Kafka clusters.
Using the Confluent CLI AsyncAPI tool¶
The Confluent CLI AsyncAPI tool exports and imports the AsyncAPI specification for Confluent Cloud clusters and documents:
- Confluent Cloud Kafka cluster
- Confluent Cloud Schema Registry cluster
- Topics
- Schemas associated with topics
- Tags appended to topics (gathered from the Stream Catalog on Confluent Cloud: User Guide to Manage Tags and Metadata)
- Topics description (gathered from the Stream Catalog)
Generating an AsyncAPI specification for a cluster (export)¶
Prerequisites¶
- Confluent Cloud environment where the Kafka cluster is located must have a Stream Governance package enabled.
- Topics must have a schema associated using TopicNameStrategy.
Limitations¶
- Only supports schema subjects with default context. To learn more about schema contexts, see Schema Contexts.
- Protobuf schemas are not supported as part of AsyncAPI version 2.
- The
--consume-examples
argument is not supported on Microsoft Windows. This is because it runs a consumer in confluent-kafka-go, which is not supported on Windows.
Steps to generate the specification¶
Log in to Confluent Cloud.
confluent login --url https://confluent.cloud/
List environments.
confluent environment list
Select the environment you want to use.
confluent environment use <ENVIRONMENT_ID>
List the clusters in the environment.
confluent kafka cluster list
Select the cluster you want to use.
confluent kafka cluster use <KAFKA_CLUSTER_ID>
Create and apply an API key for the resource (or use an existing key).
confluent api-key create --resource <KAFKA_CLUSTER_ID>
confluent api-key use <KEY> --resource <KAFKA_CLUSTER_ID>
Describe your Schema Registry cluster to get the Schema Registry cluster ID.
confluent schema-registry cluster describe
Create and apply an API key for the Schema Registry cluster (or use an existing key).
confluent api-key create --resource <SR_CLUSTER_ID>
Generate the AsyncAPI Specification and export it to a YAML file.
confluent asyncapi export --file <FILE_NAME.yaml>
Understanding the output AsyncAPI specification file¶
The following subtopics describe the output of the AsyncAPI specification. To learn more about the structure of the file and the objects defined in it, see Create AsyncAPI document for applications consuming from Kafka.
Info and servers section¶
asyncapi: 2.4.0
info:
title: API Document for Confluent Cluster
version: 1.0.0
servers:
cluster:
url: https://pkc-xxxxx.<location>.<cloud-type>.confluent.cloud:443
description: Confluent Kafka instance.
protocol: kafka
security:
- confluentBroker: []
schema-registry:
url: https://psrc-xxxxx.<location>.<cloud-type>.confluent.cloud
description: Confluent Kafka Schema Registry Server
protocol: kafka
security:
- confluentSchemaRegistry: []
- The
info
section contains the AsyncAPI version, the title and the version of the output file. Users can pass the version number while generating the file using the flag--spec-version
if they want to keep a track of the history of a Kafka cluster. - The servers section describes the Confluent Kafka cluster and the Schema Registry Cluster URL.
The protocol for both is Kafka. The
servers.cluster.url
uses the Kafka REST endpoint URL.
Channels, operations and message section¶
channels:
Hoboken-District:
subscribe:
operationId: HobokenDistrictSubscribe
bindings:
kafka:
bindingVersion: 0.3.0
groupId:
type: string
clientId:
type: string
message:
$ref: '#/components/messages/HobokenDistrictMessage'
bindings:
kafka:
x-partitions: 6
x-replicas: 3
x-configs:
cleanup.policy: delete
delete.retention.ms: 8.64e+07
confluent.value.schema.validation: "true"
x-messageCompatibility: BACKWARD
- The
channels
section contains topics and with each topic, we have an associatedoperation
(subscribe). It also contains Kafka bindings at operation level and at channel level. The channel level bindings have topic level metadata (partitions, replicas, cleanup policy, and so on ) and subject compatibility type. The operation level Kafka binding specifies the binding version and thegroupId
andclientId
schemas for the consumer. These are filled with string by default and can be updated manually by the users after generation of the YAML file. - The
message
is stored in thecomponents
section, and the reference to that message is given here.
Components section¶
components:
messages:
HobokenDistrictMessage:
schemaFormat: application/schema+json;version=draft-07
contentType: application/json
payload:
$id: https://github.com/NABSA/gbfs/blob/v2.3/gbfs.md#station_statusjson
$schema: http://json-schema.org/draft-07/schema
description: Describes the capacity and rental availability of the station
properties:
last_updated:
description: Last time the data in the feed was updated in POSIX time.
minimum: 1.4501556e+09
type: integer
station:
properties:
is_installed:
description: Is the station currently on the street?
type: boolean
is_renting:
description: Is the station currently renting vehicles?
type: boolean
is_returning:
description: Is the station accepting vehicle returns?
type: boolean
last_reported:
description: The last time this station reported its status to the
operator's backend in POSIX time.
minimum: 1.4501556e+09
type: integer
num_bikes_available:
description: Number of vehicles of any type physically available for
rental at the station.
minimum: 0
type: integer
num_bikes_disabled:
description: Number of disabled vehicles of any type at the station.
minimum: 0
type: integer
num_docks_available:
description: Number of functional docks physically at the station.
minimum: 0
type: integer
num_docks_disabled:
description: Number of empty but disabled docks at the station.
minimum: 0
type: integer
station_id:
description: Identifier of a station.
type: string
vehicle_docks_available:
description: Object displaying available docks by vehicle type (added in v2.1-RC).
items:
properties:
count:
description: A number representing the total number of available docks for the defined vehicle type (added in v2.1-RC).
minimum: 0
type: integer
vehicle_type_ids:
description: An array of strings where each string represents
a vehicle_type_id that is able to use a particular type of
dock at the station (added in v2.1-RC).
items:
type: string
type: array
required:
- vehicle_type_ids
- count
type: object
type: array
vehicle_types_available:
description: Array of objects displaying the total number of each
vehicle type at the station (added in v2.1-RC).
items:
properties:
count:
description: A number representing the total amount of this vehicle type at the station (added in v2.1-RC).
minimum: 0
type: integer
vehicle_type_id:
description: The vehicle_type_id of vehicle at the station (added in v2.1-RC).
type: string
required:
- vehicle_type_id
- count
type: object
minItems: 1
type: array
required:
- station_id
- num_bikes_available
- is_installed
- is_renting
- is_returning
- last_reported
type: object
ttl:
description: Number of seconds before the data in the feed will be updated
again (0 if the data should always be refreshed).
minimum: 0
type: integer
version:
const: "2.3"
description: GBFS version number to which the feed conforms, according
to the versioning framework (added in v1.1).
type: string
required:
- last_updated
- ttl
- version
- station
type: object
name: HobokenDistrictMessage
bindings:
kafka:
bindingVersion: 0.3.0
key:
type: string
securitySchemes:
confluentBroker:
type: userPassword
x-configs:
sasl.mechanisms: PLAIN
sasl.password: '{{CLUSTER_API_SECRET}}'
sasl.username: '{{CLUSTER_API_KEY}}'
security.protocol: sasl_ssl
confluentSchemaRegistry:
type: userPassword
x-configs:
basic.auth.user.info: '{{SCHEMA_REGISTRY_API_KEY}}:{{SCHEMA_REGISTRY_API_SECRET}}'
Configuring topics and schemas on a cluster (import)¶
The previous sections showed how to use the export command to generate an AsyncAPI specification that describes your current event-streaming system, including applications and send and receive operations against streams.
This section explains how to reverse this tooling, and import an AsyncAPI specification into your Confluent Cloud environment. The import feature reads the specification and makes configuration changes to the target cluster based on it; including updates to topic configurations, schemas, subject level compatibility, descriptions, and tags.
Prerequisites¶
- Confluent Cloud environment where the Kafka cluster is located must have a Stream Governance package enabled.
- Topics must have a schema associated using TopicNameStrategy.
Limitations¶
- Only supports schema subjects with default context. To learn more about schema contexts, see Schema Contexts.
- Protobuf schemas are not supported as part of AsyncAPI version 2.
Steps to import a specification¶
Log in to Confluent Cloud.
confluent login --url https://confluent.cloud/
List environments.
confluent environment list
Select the environment you want to use.
confluent environment use <ENVIRONMENT_ID>
List the clusters in the environment.
confluent kafka cluster list
Select the cluster you want to use.
confluent kafka cluster use <KAFKA_CLUSTER_ID>
Create and apply an API key for the resource (or use an existing key).
confluent api-key create --resource <KAFKA_CLUSTER_ID>
confluent api-key use <KEY> --resource <KAFKA_CLUSTER_ID>
Describe your Schema Registry cluster to get the Schema Registry cluster ID.
confluent schema-registry cluster describe
Create and apply an API key for the Schema Registry cluster (or use an existing key).
confluent api-key create --resource <SR_CLUSTER_ID>
Generate the AsyncAPI Specification and export it to a YAML file.
confluent asyncapi import --file <Path-to-Spec>/<Spec-name>.yaml
Understanding how the import feature updates configurations¶
This command reads from an AsyncAPI specification YAML file and performs the following operations for each channel on a Confluent Cloud cluster:
- Adds new topics and/or updates existing topic configurations
- Registers schemas for topics
- Changes subject level compatibility for schemas
- Adds schema level tags
- Adds topic level tags
- Adds topic descriptions