Manage Flink Catalogs and Databases for Confluent Manager for Apache Flink
Flink SQL uses the concept of catalogs and databases to connect to external storage systems.
Important
Flink SQL support is available as an open preview. A Preview feature is a feature that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may change or discontinue providing releases of Preview features at any time at Confluent’s sole discretion. Comments, questions, and suggestions related to preview features are encouraged and can be submitted to your account representative.
Breaking Change: SQL support is in preview, and the Catalog API has changed between CMF version 2.0 and 2.1. In version 2.1, catalogs and databases are separate resources. A catalog is a top-level resource and references a Schema Registry instance. A database is a sub-resource of a catalog, references a Kafka cluster and exposes all topics of its Kafka cluster as queryable tables. A catalog can contain multiple databases.
If you’re migrating from CMF 2.0, see the Confluent Platform 8.0 documentation. Note that when you upgrade to CMF 2.1, CMF will automatically migrate your existing catalog objects to the new catalog and database format.
A core concept of SQL is the table. Tables store data, represented as rows. Users can query and modify the rows of a table by running SQL queries and Data Definition Language (DDL) statements. Most database systems store, manage, and process table data internally. In contrast, Flink SQL is solely a processing engine and not a data store. Flink accesses external data storage systems to read and write data.
Catalogs and databases bridge the gap between the SQL engine and external data storage systems, enabling users to access and manipulate data stored in various formats and locations.
Confluent Manager for Apache Flink® features built-in Kafka catalogs and databases to connect to Kafka and Schema Registry. A Kafka database exposes Kafka topics as tables and derives their schema from Schema Registry.
Catalogs and Databases: A catalog contains one or more databases. A catalog references a Schema Registry instance, which is used to derive table schemas from topic schemas. Each catalog can have multiple databases. A database references a Kafka cluster and contains tables that correspond to the topics in that cluster. Each topic of a Kafka cluster is represented as a TABLE in the database.
Hierarchy:
CATALOG → references a Schema Registry instance
DATABASE → references a Kafka cluster (contained within a catalog)
TABLE → corresponds to a Kafka topic (contained within a database)
A catalog is a CMF global resource and potentially accessible from all CMF environments. The access of an environment to a catalog or database can be restricted with Secrets and Secret Mappings (see section below).
Create a catalog
A Kafka catalog references a Schema Registry instance, which is used to look up topic schema information for the topics of the Kafka clusters referenced by the catalog’s databases. The catalog is configured with connection properties for a Schema Registry client. These properties are used to fetch the metadata that is needed during query translation and execution.
A Kafka Catalog is configured with the following resource definition:
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "KafkaCatalog",
"metadata": {
"name": "kafka-cat"
},
"spec": {
"srInstance": {
"connectionConfig": {
"schema.registry.url": "http://schemaregistry:8081"
},
"connectionSecretId": "sr-conn-secret-id"
}
}
}
The
connectionConfigproperty is a map and can contain further configuration options as key/value pairs.The
connectionSecretIdproperty is optional. It can be configured to provide a placeholder against which environments can link secrets with connection credentials. For instruction on how to create a secret and link it to a catalog, see Secrets and secret mappings.
Given a catalog resource definition that is stored in a file catalog.json, the following REST call creates the catalog:
curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/catalogs/kafka \
-d@/<path-to>/catalog.json
Create a database
A database is created within a catalog and references a Kafka cluster. The database exposes the topics of its Kafka cluster as tables. The schema of a table is derived from the schema information of its topic that is fetched from the Schema Registry instance referenced by the catalog that the database is located in.
A database is configured with connection properties for Kafka clients. These properties are used to access metadata from Kafka and to read data from and write data to topics during query execution.
A database is configured with the following resource definition:
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "KafkaDatabase",
"metadata": {
"name": "kafka-db",
},
"spec": {
"kafkaCluster": {
"connectionConfig": {
"bootstrap.servers": "kafka-1:9092"
},
"connectionSecretId": "kafka-1-secret-id"
},
"alterEnvironments": [
"admin-env"
]
}
}
The
connectionConfigproperty is a map containing the bootstrap servers for the Kafka cluster. It is possible to add additional Kafka connection options as key/value pairs.The optional
connectionSecretIdproperty can be used to map environment-specific secrets containing sensitive authentication credentials. See the following section on Secrets and Secret Mappings for instructions.The
alterEnvironmentsproperty contains a list of names of environments that have permission to runALTER TABLEstatements against tables in this database.ALTER TABLEstatements that are submitted from an environment that is not listed in thealterEnvironmentsproperty are rejected with a corresponding error message.
Given a database resource definition that is stored in a file database.json, the following REST call creates the database in the catalog kcat:
curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/catalogs/kafka/kcat/databases \
-d@/<path-to>/database.json
Secrets and secret mappings
Catalogs and databases are configured with a connectionConfig to access a Schema Registry instance or a Kafka cluster, respectively. Production setups of these systems require user authentication. Schema Registry and Kafka access credentials are sensitive information that must be securely stored and not be shared.
CMF features the concepts of secrets to securely store access credentials and secret mappings to link secrets per environment to catalogs and databases. This mechanism allows CMF administrators to configure for each environment different access credentials (with different permissions) for catalogs and databases.
A secret securely stores a map of connection properties in a Kubernetes secret.
Catalogs and databases can define a
connectionSecretIdin theirspec. TheconnectionSecretIdis a placeholder against which an environment can map a secret.A secret mapping is created in the context of an environment and links a secret to a
connectionSecretId.
The Schema Registry connection properties of a catalog for an environment are derived by combining the properties of the public connectionConfig of the catalog resource with the properties of the secret that is linked for the environment to the catalog’s connectionSecretId. If a catalog has no connectionSecretId defined, every environment only uses the properties of the public connectionConfig. This implies, that all environments use the same connection properties for a catalog without connectionSecretId. If a catalog has a connectionSecretId defined, but an environment does not link a secret to this connectionSecretId, then the catalog and all of its databases are not accessible from the environment. The same mechanism is used to derive the Kafka connection properties of a database for an environment.
Important
Within an environment, CMF uses the same Schema Registry and Kafka connection properties to translate and execute all statements, regardless of the user who submits the statement. Hence, data access of a user is controlled by their access to CMF environments and the connection configuration of these environments.
Secrets and secret mappings enable very flexible configuration of environment connection credentials. An environment can map a secret to multiple different connectionSecretIds, reusing the secret for multiple catalogs or databases. Multiple catalogs or databases can share the same connectionSecretId. A secret that is mapped to a shared connectionSecretId will be used for all catalogs or databases with that connectionSecretId.
The following examples show how to configure secrets for a catalog. The configuration for databases is analogous.
Create a secret resource with connection options for Schema Registry authentication:
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "Secret",
"metadata": {
"name": "sr-secret"
},
"spec": {
"data": {
"basic.auth.user.info": "admin:admin",
"basic.auth.credentials.source": "USER_INFO"
}
}
}
The
dataproperty is a map that can hold arbitrary key/value pairs. A secret for Kafka connection credentials is created by putting the respective configuration options into thedatamap.
The secret is created via the REST API:
curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/secrets -d@/<path-to>/sr-secret.json
The following resource definition maps the secret "sr-secret" to the connectionSecretId "sr-conn-secret-id":
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "EnvironmentSecretMapping",
"metadata": {
"name": "sr-conn-secret-id"
},
"spec": {
"secretName": "sr-secret"
}
}
The mapping is created for an environment env-1 with the following REST request:
curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/environments/env-1/secret-mappings \
-d@/<path-to>/sr-secret-mapping.json
With this mapping, statements created in environment env-1 will use the following properties to configure the Schema Registry client of catalog kafka-cat:
// from the plain "connectionConfig" of catalog `kafka-cat`
"schema.registry.url": "http://schemaregistry:8081",
// from the secret "sr-secret" that is mapped to `kafka-cat`'s connectionSecretId 'sr-conn-secret-id'
"basic.auth.user.info": "admin:admin",
"basic.auth.credentials.source": "USER_INFO"
Follow the same steps to create a secret for Kafka credentials and map them to the connectionSecretId of a database.
Update a catalog
You can update a catalog’s configuration by submitting a PUT request with the updated specification. For example, you can update the catalog’s name or Schema Registry connection configuration.
To update the kafka-cat catalog with a new resource file updated-catalog.json, run the following REST request:
curl -v -H "Content-Type: application/json" \
-X PUT "http://cmf:8080/cmf/api/v1/catalogs/kafka/kafka-cat" \
-d@/<path-to>/updated-catalog.json
Update a database
You can update a database’s configuration by submitting a PUT request with the updated specification. For example, you can update the name of the database, the Kafka cluster configuration or the list of environments with ALTER TABLE permissions.
To update the kafka-db database in the kafka-cat catalog with a new resource file updated-database.json, run the following REST request:
curl -v -H "Content-Type: application/json" \
-X PUT "http://cmf:8080/cmf/api/v1/catalogs/kafka/kafka-cat/databases/kafka-db"
-d@/<path-to>/updated-database.json
Customize tables with ALTER TABLE
You can use ALTER TABLE statements to customize how a Kafka topic is exposed as a table in CMF. This allows you to add computed columns, add metadata columns and modify watermark strategies.
Important
An ALTER TABLE statement is only accepted if the statement’s environment has permission to modify tables in the database that contains the table to alter. An environment has this permission if its name is listed in the database’s alterEnvironments property. If an ALTER TABLE statement is created in an environment that is not listed, the statement will be rejected with a corresponding error message. See the section on creating a database for details about the alterEnvironments property.
In addition, an ALTER TABLE statement does not affect running statements. This means a running statement is not affected by modifications of a table that it reads from or writes to.
Add a computed column
Add a computed column to a table that derives a value from existing columns:
> ALTER TABLE orders ADD total_value AS quantity * price;
Modify the watermark strategy
Modify the watermark for time-based processing:
> ALTER TABLE clicks MODIFY WATERMARK FOR timestamp_column AS timestamp_column - INTERVAL '5' SECOND;
Examples with the Confluent CLI
Create an ALTER TABLE statement using the CLI:
confluent --environment env-1 --compute-pool pool flink statement create alter-orders \
--catalog kafka-cat --database kafka-db \
--sql "ALTER TABLE orders ADD computed_total AS quantity * price;"
Delete a catalog
A catalog can be deleted via the Confluent CLI or the REST API.
Important
Only catalogs without databases can be deleted. Before deleting a catalog, you must delete all databases associated with it.
Delete a catalog kafka-cat with the Confluent CLI:
confluent flink catalog delete kafka-cat
Delete a catalog kafka-cat with the REST API:
curl -v -H "Content-Type: application/json" \
-X DELETE http://cmf:8080/cmf/api/v1/catalogs/kafka/kafka-cat
Delete a database
Important
Deleting a database will delete all table metadata associated with it. The actual topics in the Kafka cluster are not deleted.
Delete a database kafka-db in a catalog kafka-cat with the REST API:
curl -v -H "Content-Type: application/json" \
-X DELETE http://cmf:8080/cmf/api/v1/catalogs/kafka/kafka-cat/databases/kafka-db
Limitations
CMF 2.0 does not support any catalog other than the built-in KafkaCatalog. An exception is the example catalog enabled with the cmf.sql.examples-catalog.enabled configuration flag.
The following limitations apply for the KafkaCatalog in CMF 2.0:
Tables are exposed using the default inference mechanism. You can use
ALTER TABLEstatements to customize how a topic is exposed as a table (for example, adding computed columns or modifying watermarks), butCREATE TABLEandDROP TABLEstatements are not supported.The catalog uses a default mechanism to translate topic and schema metadata into Flink table and connector metadata. This is the same mechanism that Confluent Cloud Flink SQL uses for inferred tables.
The catalog uses the
TopicNameStrategyto retrieve the key and value schemas of a topic. For a topic calledorders, the catalog looks for two subjects calledorders-keyandorders-value. If these subjects are not present, the key or value schemas are read as raw bytes and exposed as single columns of typeBINARY.Compacted Kafka topics are not exposed as tables.