Use AsyncAPI to Describe Topics and Schemas on Confluent Cloud Clusters

This page describes the AsyncAPI, what it is and how it works, and provides walkthroughs to both generate (export) an AsyncAPI specification from an existing cluster and reverse tooling an AsyncAPI spec (import) into an environment to configure your Confluent Cloud resources on-the-fly.

What is AsyncAPI

AsyncAPI is an open source machine-readable specification language for message-oriented APIs and architectures. It is not an API itself, nor does it act as a code abstraction layer for making calls to messaging systems itself. AsyncAPI is a metadata description language. It is used to generate descriptions of resources like topics and queues, events, and producers and consumers, in YAML.

AsyncAPI is heavily influenced by the OpenAPI, formerly Swagger, which is an HTTP API specification language that shares many of the same objectives. OpenAPI describes and documents how a REST API operates in a standard way so that it can be shared widely. AsyncAPI is “OpenAPI for the asynchronous world”.

However, AsyncAPI goes beyond the OpenAPI specification by allowing users to describe the applications and their send and receive operations against streams, broadening their usage to that of a streaming architecture specification. Plus, a growing and vast ecosystem of AsyncAPI tools exists, including code generators, validators, parsers, and so on, which can be used to speed up those tasks.

AsyncAPI with Confluent

Confluent uses the Schema Registry to define schemas for messages sent over Apache Kafka®. A customer has a Kafka cluster which can contain multiple topics (channels). Users can produce (publish) or consume (subscribe) messages to and from these topics. Each topic can also contain an associated schema, managed by Schema Registry, which is used to define the structure of messages of that topic. Applications talk to two brokers (servers) while sending the message, one is the Kafka cluster and the other is the Schema Registry Cluster. Confluent Kafka topics have metadata like cleanup policy, delete retention time, and so on, as well as tags at the topic level and schema level. AsyncAPI Specification is a way to capture the entire event-streaming architecture in a single document and provide the users a concise screenshot of their Confluent Kafka clusters.

Using the Confluent CLI AsyncAPI tool

The Confluent CLI AsyncAPI tool exports and imports the AsyncAPI specification for Confluent Cloud clusters and documents:

Generating an AsyncAPI specification for a cluster (export)

Prerequisites

  • Confluent Cloud environment where the Kafka cluster is located must have a Stream Governance package enabled.
  • Topics must have a schema associated using TopicNameStrategy.

Limitations

  • Only supports schema subjects with default context. To learn more about schema contexts, see Schema Contexts.
  • Protobuf schemas are not supported as part of AsyncAPI version 2.
  • The --consume-examples argument is not supported on Microsoft Windows. This is because it runs a consumer in confluent-kafka-go, which is not supported on Windows.

Steps to generate the specification

  1. Log in to Confluent Cloud.

    confluent login --url https://confluent.cloud/
    
  2. List environments.

    confluent environment list
    
  3. Select the environment you want to use.

    confluent environment use <ENVIRONMENT_ID>
    
  4. List the clusters in the environment.

    confluent kafka cluster list
    
  5. Select the cluster you want to use.

    confluent kafka cluster use <KAFKA_CLUSTER_ID>
    
  6. Create and apply an API key for the resource (or use an existing key).

    confluent api-key create --resource <KAFKA_CLUSTER_ID>
    
    confluent api-key use <KEY> --resource <KAFKA_CLUSTER_ID>
    
  7. Describe your Schema Registry cluster to get the Schema Registry cluster ID.

    confluent schema-registry cluster describe
    
  8. Create and apply an API key for the Schema Registry cluster (or use an existing key).

    confluent api-key create --resource <SR_CLUSTER_ID>
    
  9. Generate the AsyncAPI Specification and export it to a YAML file.

    confluent asyncapi export --file <FILE_NAME.yaml>
    

Understanding the output AsyncAPI specification file

Info and servers section

asyncapi: 2.4.0
info:
  title: API Document for Confluent Cluster
  version: 1.0.0
servers:
  cluster:
    url: https://pkac-xxxxx.<location>.<cloud-type>.confluent.cloud
    description: Confluent Kafka instance.
    protocol: kafka
    security:
    - confluentBroker: []
  schema-registry:
    url: https://psrc-xxxxx.<location>.<cloud-type>.confluent.cloud
    description: Confluent Kafka Schema Registry Server
    protocol: kafka
    security:
    - confluentSchemaRegistry: []
  • The info section contains the AsyncAPI version, the title and the version of the output file. Users can pass the version number while generating the file using the flag --spec-version if they want to keep a track of the history of a Kafka cluster.
  • The servers section contains the Confluent Kafka Cluster URL and the Schema Registry Cluster URL. It also mentions that the protocol being used is Kafka.

Channels, operations and message section

channels:
  Hoboken-District:
    subscribe:
      operationId: HobokenDistrictSubscribe
      bindings:
        kafka:
          bindingVersion: 0.3.0
          groupId:
            type: string
          clientId:
            type: string
      message:
        $ref: '#/components/messages/HobokenDistrictMessage'
    bindings:
      kafka:
        x-partitions: 6
        x-replicas: 3
        x-configs:
          cleanup.policy: delete
          delete.retention.ms: 8.64e+07
          confluent.value.schema.validation: "true"
    x-messageCompatibility: BACKWARD
  • The channels section contains topics and with each topic, we have an associated operation (subscribe). It also contains Kafka bindings at operation level and at channel level. The channel level bindings have topic level metadata (partitions, replicas, cleanup policy, and so on ) and subject compatibility type. The operation level Kafka binding specifies the binding version and the groupId and clientId schemas for the consumer. These are filled with string by default and can be updated manually by the users after generation of the YAML file.
  • The message is stored in the components section, and the reference to that message is given here.

Components section

components:
messages:
  HobokenDistrictMessage:
    schemaFormat: application/schema+json;version=draft-07
    contentType: application/json
    payload:
      $id: https://github.com/NABSA/gbfs/blob/v2.3/gbfs.md#station_statusjson
      $schema: http://json-schema.org/draft-07/schema
      description: Describes the capacity and rental availability of the station
      properties:
        last_updated:
          description: Last time the data in the feed was updated in POSIX time.
          minimum: 1.4501556e+09
          type: integer
        station:
          properties:
            is_installed:
              description: Is the station currently on the street?
              type: boolean
            is_renting:
              description: Is the station currently renting vehicles?
              type: boolean
            is_returning:
              description: Is the station accepting vehicle returns?
              type: boolean
            last_reported:
              description: The last time this station reported its status to the
                operator's backend in POSIX time.
              minimum: 1.4501556e+09
              type: integer
            num_bikes_available:
              description: Number of vehicles of any type physically available for
                rental at the station.
              minimum: 0
              type: integer
            num_bikes_disabled:
              description: Number of disabled vehicles of any type at the station.
              minimum: 0
              type: integer
            num_docks_available:
              description: Number of functional docks physically at the station.
              minimum: 0
              type: integer
            num_docks_disabled:
              description: Number of empty but disabled docks at the station.
              minimum: 0
              type: integer
            station_id:
              description: Identifier of a station.
              type: string
            vehicle_docks_available:
              description: Object displaying available docks by vehicle type (added in v2.1-RC).
              items:
                properties:
                  count:
                    description: A number representing the total number of available docks for the defined vehicle type (added in v2.1-RC).
                    minimum: 0
                    type: integer
                  vehicle_type_ids:
                    description: An array of strings where each string represents
                      a vehicle_type_id that is able to use a particular type of
                      dock at the station (added in v2.1-RC).
                    items:
                      type: string
                    type: array
                required:
                - vehicle_type_ids
                - count
                type: object
              type: array
            vehicle_types_available:
              description: Array of objects displaying the total number of each
                 vehicle type at the station (added in v2.1-RC).
              items:
                properties:
                  count:
                    description: A number representing the total amount of this vehicle type at the station (added in v2.1-RC).
                    minimum: 0
                    type: integer
                  vehicle_type_id:
                    description: The vehicle_type_id of vehicle at the station (added in v2.1-RC).
                    type: string
                required:
                - vehicle_type_id
                - count
                type: object
              minItems: 1
              type: array
          required:
          - station_id
          - num_bikes_available
          - is_installed
          - is_renting
          - is_returning
          - last_reported
          type: object
        ttl:
          description: Number of seconds before the data in the feed will be updated
            again (0 if the data should always be refreshed).
          minimum: 0
          type: integer
        version:
          const: "2.3"
          description: GBFS version number to which the feed conforms, according
             to the versioning framework (added in v1.1).
          type: string
      required:
      - last_updated
      - ttl
      - version
      - station
      type: object
    name: HobokenDistrictMessage
    bindings:
      kafka:
        bindingVersion: 0.3.0
        key:
          type: string
securitySchemes:
  confluentBroker:
    type: userPassword
    x-configs:
      sasl.mechanisms: PLAIN
      sasl.password: '{{CLUSTER_API_SECRET}}'
      sasl.username: '{{CLUSTER_API_KEY}}'
      security.protocol: sasl_ssl
  confluentSchemaRegistry:
    type: userPassword
    x-configs:
      basic.auth.user.info: '{{SCHEMA_REGISTRY_API_KEY}}:{{SCHEMA_REGISTRY_API_SECRET}}'

Configuring topics and schemas on a cluster (import)

The previous sections showed how to use the export command to generate an AsyncAPI specification that describes your current event-streaming system, including applications and send and receive operations against streams.

This section explains how to reverse this tooling, and import an AsyncAPI specification into your Confluent Cloud environment. The import feature reads the specification and makes configuration changes to the target cluster based on it; including updates to topic configurations, schemas, subject level compatibility, descriptions, and tags.

Prerequisites

  • Confluent Cloud environment where the Kafka cluster is located must have a Stream Governance package enabled.
  • Topics must have a schema associated using TopicNameStrategy.

Limitations

  • Only supports schema subjects with default context. To learn more about schema contexts, see Schema Contexts.
  • Protobuf schemas are not supported as part of AsyncAPI version 2.

Steps to import a specification

  1. Log in to Confluent Cloud.

    confluent login --url https://confluent.cloud/
    
  2. List environments.

    confluent environment list
    
  3. Select the environment you want to use.

    confluent environment use <ENVIRONMENT_ID>
    
  4. List the clusters in the environment.

    confluent kafka cluster list
    
  5. Select the cluster you want to use.

    confluent kafka cluster use <KAFKA_CLUSTER_ID>
    
  6. Create and apply an API key for the resource (or use an existing key).

    confluent api-key create --resource <KAFKA_CLUSTER_ID>
    
    confluent api-key use <KEY> --resource <KAFKA_CLUSTER_ID>
    
  7. Describe your Schema Registry cluster to get the Schema Registry cluster ID.

    confluent schema-registry cluster describe
    
  8. Create and apply an API key for the Schema Registry cluster (or use an existing key).

    confluent api-key create --resource <SR_CLUSTER_ID>
    
  9. Generate the AsyncAPI Specification and export it to a YAML file.

    confluent asyncapi import --file <Path-to-Spec>/<Spec-name>.yaml
    

Understanding how the import feature updates configurations

This command reads from an AsyncAPI specification YAML file and performs the following operations for each channel on a Confluent Cloud cluster:

  • Adds new topics and/or updates existing topic configurations
  • Registers schemas for topics
  • Changes subject level compatibility for schemas
  • Adds schema level tags
  • Adds topic level tags
  • Adds topic descriptions