Manage Flink Catalogs and Databases for Confluent Manager for Apache Flink

Apache Flink® SQL uses the concept of catalogs and databases to connect to external storage systems. This topic describes how to create and manage catalogs and databases using Confluent Manager for Apache Flink (CMF) for Flink SQL.

A core concept of SQL is the table. Tables store data, represented as rows. Users can query and modify the rows of a table by running SQL queries and Data Definition Language (DDL) statements. Most database systems store, manage, and process table data internally. In contrast, Flink SQL is solely a processing engine and not a data store. Flink accesses external data storage systems to read and write data.

Catalogs and databases bridge the gap between the SQL engine and external data storage systems, enabling users to access and manipulate data stored in various formats and locations.

Confluent Manager for Apache Flink® features built-in Kafka catalogs and databases to connect to Kafka and Schema Registry. A Kafka database exposes Kafka topics as tables and derives their schema from Schema Registry.

Catalogs and Databases: A catalog contains one or more databases. A catalog references a Schema Registry instance, which is used to derive table schemas from topic schemas. Each catalog can have multiple databases. A database references a Kafka cluster and contains tables that correspond to the topics in that cluster. Each topic of a Kafka cluster is represented as a TABLE in the database.

Tables: Tables are managed using SQL DDL statements such as CREATE TABLE, ALTER TABLE, and DROP TABLE. These statements allow you to create new tables, customize how Kafka topics are exposed as tables, and drop tables. For details, see Table Operations in Confluent Manager for Apache Flink.

Hierarchy:

  • CATALOG → references a Schema Registry instance

  • DATABASE → references a Kafka cluster (contained within a catalog)

  • TABLE → corresponds to a Kafka topic (contained within a database)

A catalog is a CMF global resource and potentially accessible from all CMF environments. The access of an environment to a catalog or database can be restricted with Secrets and Secret Mappings (see section below).

Create a catalog

A Kafka catalog references a Schema Registry instance, which is used to look up topic schema information for the topics of the Kafka clusters referenced by the catalog’s databases. The catalog is configured with connection properties for a Schema Registry client. These properties are used to fetch the metadata that is needed during query translation and execution.

A Kafka Catalog is configured with the following resource definition:

{
   "apiVersion": "cmf.confluent.io/v1",
   "kind": "KafkaCatalog",
   "metadata": {
     "name": "kafka-cat"
   },
   "spec": {
     "srInstance": {
       "connectionConfig": {
         "schema.registry.url": "http://schemaregistry:8081"
       },
       "connectionSecretId": "sr-conn-secret-id"
     }
   }
 }
  • The connectionConfig property is a map and can contain further configuration options as key/value pairs.

  • The connectionSecretId property is optional. It can be configured to provide a placeholder against which environments can link secrets with connection credentials. For instruction on how to create a secret and link it to a catalog, see Secrets and secret mappings.

Given a catalog resource definition that is stored in a file catalog.json, the following REST call creates the catalog:

curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/catalogs/kafka \
-d@/<path-to>/catalog.json

Create a database

A database is created within a catalog and references a Kafka cluster. The database exposes the topics of its Kafka cluster as tables. The schema of a table is derived from the schema information of its topic that is fetched from the Schema Registry instance referenced by the catalog that the database is located in.

A database is configured with connection properties for Kafka clients. These properties are used to access metadata from Kafka and to read data from and write data to topics during query execution.

A database is configured with the following resource definition:

{
  "apiVersion": "cmf.confluent.io/v1",
  "kind": "KafkaDatabase",
  "metadata": {
    "name": "kafka-db",
  },
  "spec": {
    "kafkaCluster": {
      "connectionConfig": {
        "bootstrap.servers": "kafka-1:9092"
      },
      "connectionSecretId": "kafka-1-secret-id"
    },
    "ddlEnvironments": [
      "admin-env"
    ]
  }
}
  • The connectionConfig property is a map containing the bootstrap servers for the Kafka cluster. It is possible to add additional Kafka connection options as key/value pairs.

  • The optional connectionSecretId property can be used to map environment-specific secrets containing sensitive authentication credentials. See the following section on Secrets and Secret Mappings for instructions.

  • The ddlEnvironments property lists the environments that can run table operation statements (CREATE TABLE, ALTER TABLE, DROP TABLE) against tables in this database. You cannot submit table operation statements from environments not listed in this property. If you try, the statement fails with an error. For details, see Table Operations in Confluent Manager for Apache Flink.

Important

The Kafka and Schema Registry credentials of an environment that has access to a database must meet the following minimum permissions:

  • Catalog Schema Registry credentials must allow you to read subjects and schemas.

  • Database Kafka credentials must allow you to list topics and read from the metadata topic (_confluent_sr_catalog). In addition, the credentials must allow you to read from and write to all topics that statements in the environment need to access.

For environments listed in the ddlEnvironments property, the credentials must additionally include:

  • Catalog Schema Registry credentials must allow you to create and delete subjects and schemas.

  • Database Kafka credentials must allow you to create and delete topics, and write to the metadata topic (_confluent_sr_catalog).

Given a database resource definition that is stored in a file database.json, the following REST call creates the database in the catalog kcat:

curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/catalogs/kafka/kcat/databases \
-d@/<path-to>/database.json

Secrets and secret mappings

Catalogs and databases are configured with a connectionConfig to access a Schema Registry instance or a Kafka cluster, respectively. Production setups of these systems require user authentication. Schema Registry and Kafka access credentials are sensitive information that must be securely stored and not be shared.

CMF features the concepts of secrets to securely store access credentials and secret mappings to link secrets per environment to catalogs and databases. This mechanism allows CMF administrators to configure for each environment different access credentials (with different permissions) for catalogs and databases.

  • A secret securely stores a map of connection properties in a Kubernetes secret.

  • Catalogs and databases can define a connectionSecretId in their spec. The connectionSecretId is a placeholder against which an environment can map a secret.

  • A secret mapping is created in the context of an environment and links a secret to a connectionSecretId.

The Schema Registry connection properties of a catalog for an environment are derived by combining the properties of the public connectionConfig of the catalog resource with the properties of the secret that is linked for the environment to the catalog’s connectionSecretId. If a catalog has no connectionSecretId defined, every environment only uses the properties of the public connectionConfig. This implies, that all environments use the same connection properties for a catalog without connectionSecretId. If a catalog has a connectionSecretId defined, but an environment does not link a secret to this connectionSecretId, then the catalog and all of its databases are not accessible from the environment. The same mechanism is used to derive the Kafka connection properties of a database for an environment.

Important

Within an environment, CMF uses the same Schema Registry and Kafka connection properties to translate and execute all statements, regardless of the user who submits the statement. Hence, data access of a user is controlled by their access to CMF environments and the connection configuration of these environments.

Secrets and secret mappings enable very flexible configuration of environment connection credentials. An environment can map a secret to multiple different connectionSecretIds, reusing the secret for multiple catalogs or databases. Multiple catalogs or databases can share the same connectionSecretId. A secret that is mapped to a shared connectionSecretId will be used for all catalogs or databases with that connectionSecretId.

The following examples show how to configure secrets for a catalog. The configuration for databases is analogous.

Create a secret resource with connection options for Schema Registry authentication:

{
    "apiVersion": "cmf.confluent.io/v1",
    "kind": "Secret",
    "metadata": {
      "name": "sr-secret"
    },
    "spec": {
      "data": {
          "basic.auth.user.info": "admin:admin",
          "basic.auth.credentials.source": "USER_INFO"
      }
    }
}
  • The data property is a map that can hold arbitrary key/value pairs. A secret for Kafka connection credentials is created by putting the respective configuration options into the data map.

The secret is created via the REST API:

curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/secrets -d@/<path-to>/sr-secret.json

The following resource definition maps the secret "sr-secret" to the connectionSecretId "sr-conn-secret-id":

{
    "apiVersion": "cmf.confluent.io/v1",
    "kind": "EnvironmentSecretMapping",
    "metadata": {
      "name": "sr-conn-secret-id"
    },
    "spec": {
      "secretName": "sr-secret"
    }
}

The mapping is created for an environment env-1 with the following REST request:

curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/environments/env-1/secret-mappings \
-d@/<path-to>/sr-secret-mapping.json

With this mapping, statements created in environment env-1 will use the following properties to configure the Schema Registry client of catalog kafka-cat:

// from the plain "connectionConfig" of catalog `kafka-cat`
"schema.registry.url": "http://schemaregistry:8081",
// from the secret "sr-secret" that is mapped to `kafka-cat`'s connectionSecretId 'sr-conn-secret-id'
"basic.auth.user.info": "admin:admin",
"basic.auth.credentials.source": "USER_INFO"

Follow the same steps to create a secret for Kafka credentials and map them to the connectionSecretId of a database.

Update a catalog

You can update a catalog’s configuration by submitting a PUT request with the updated specification. For example, you can update the catalog’s name or Schema Registry connection configuration.

To update the kafka-cat catalog with a new resource file updated-catalog.json, run the following REST request:

curl -v -H "Content-Type: application/json" \
-X PUT "http://cmf:8080/cmf/api/v1/catalogs/kafka/kafka-cat" \
-d@/<path-to>/updated-catalog.json

Update a database

You can update a database’s configuration by submitting a PUT request with the updated specification. For example, you can update the database name, the Kafka cluster configuration or the list of environments with table operation permissions.

To update the kafka-db database in the kafka-cat catalog with a new resource file updated-database.json, run the following REST request:

curl -v -H "Content-Type: application/json" \
-X PUT "http://cmf:8080/cmf/api/v1/catalogs/kafka/kafka-cat/databases/kafka-db"
-d@/<path-to>/updated-database.json

Delete a catalog

A catalog can be deleted via the Confluent CLI or the REST API.

Important

Only catalogs without databases can be deleted. Before deleting a catalog, you must delete all databases associated with it.

Delete a catalog kafka-cat with the Confluent CLI:

confluent flink catalog delete kafka-cat

Delete a catalog kafka-cat with the REST API:

curl -v -H "Content-Type: application/json" \
-X DELETE http://cmf:8080/cmf/api/v1/catalogs/kafka/kafka-cat

Delete a database

Important

Deleting a database will delete all table metadata associated with it. The actual topics in the Kafka cluster are not deleted.

Delete a database kafka-db in a catalog kafka-cat with the REST API:

curl -v -H "Content-Type: application/json" \
-X DELETE http://cmf:8080/cmf/api/v1/catalogs/kafka/kafka-cat/databases/kafka-db

Limitations

CMF 2.0 does not support any catalog other than the built-in KafkaCatalog. An exception is the example catalog enabled with the cmf.sql.examples-catalog.enabled configuration flag.