Manage Flink Catalogs and Databases for Confluent Manager for Apache Flink
Flink SQL uses the concept of catalogs and databases to connect to external storage systems.
Important
Flink SQL support is available as an open preview. A Preview feature is a feature that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing releases of Preview features at any time at Confluent’s sole discretion. Comments, questions, and suggestions related to preview features are encouraged and can be submitted to your account representative.
Important
Breaking Change: The Catalogs API has changed from CMF 2.0 to 2.1, since the SQL support is still in Open Preview. In the new version, catalogs and databases are separate resources. A catalog references a Schema Registry instance, and databases (which reference Kafka clusters) are created as separate resources within a catalog.
If you’re migrating from CMF 2.0, see the previous documentation for reference. Note that when you upgrade to CMF 2.1, CMF will automatically migrate your existing catalog objects to the new catalogs and databases format.
A core concept of SQL is the table. Tables store data, represented as rows. Users can query and modify the rows of a table by running SQL queries and Data Definition Language (DDL) statements. Most database systems store, manage, and process table data internally. In contrast, Flink SQL is solely a processing engine and not a data store. Flink accesses external data storage systems to read and write data.
Catalogs and databases bridge the gap between the SQL engine and external data storage systems, enabling users to access and manipulate data stored in various formats and locations.
Confluent Manager for Apache Flink® features built-in Kafka catalogs to connect to Kafka and Schema Registry. A Kafka Database exposes Kafka topics as tables and derives their schema from Schema Registry.
Catalogs and Databases: A catalog contains one or more databases. A catalog references a Schema Registry instance, which is used to derive table schemas from topic schemas. Each catalog can have multiple databases. A database references a Kafka cluster and contains tables that correspond to the topics in that cluster. Each topic of a Kafka cluster is represented as a TABLE in the database.
Hierarchy: - CATALOG → references a Schema Registry instance - DATABASE → references a Kafka cluster (contained within a catalog) - TABLE → corresponds to a Kafka topic (contained within a database)
Catalogs are accessible from all CMF environments, but there are ways to restrict access to specific catalogs or databases.
Create a catalog
A Kafka catalog references a Schema Registry instance, which is used to derive table schemas from topic schemas. The catalog is configured with connection properties for the Schema Registry client. These properties are used to fetch the metadata that is needed during query translation.
A Kafka Catalog is configured with the following resource definition:
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "KafkaCatalog",
"metadata": {
"name": "kafka-cat"
},
"spec": {
"srInstance": {
"connectionConfig": {
"schema.registry.url": "http://schemaregistry:8081"
},
"connectionSecretId": "sr-conn-secret-id"
}
}
}
If you need to provide credentials for Schema Registry access, create a Secret and map it to the catalog’s connectionSecretId. Configure catalog credentials in the same way as database credentials (shown in the Create a database section below).
Secrets and Secret Mappings
All client properties specified in the connectionConfig field are used by all environments to translate and execute statements and are not handled as sensitive data. Sensitive connection properties, such as access credentials or properties that should only be used for statements in certain environments, must be stored in Secrets. A Secret is a set of properties (key-value pairs) that is concatenated with the public connectionConfig.
With Secrets and SecretMappings, you can configure different connection properties (including credentials) for catalogs and databases per environment. Within an environment, CMF uses the same properties to translate and execute all statements, regardless of the user who submits the statement.
If an environment does not have a mapping for a connectionSecretId, the corresponding
catalog or database will not be accessible from this environment.
This indicates an incomplete configuration that would result in connection failures of the Schema Registry or Kafka clients.
This mechanism also allows restricting the access of environments to certain catalogs or databases.
The following examples show how to configure secrets for both catalogs (Schema Registry) and databases (Kafka clusters).
Configure catalog credentials (Schema Registry)
Create a secret for Schema Registry authentication:
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "Secret",
"metadata": {
"name": "sr-secret"
},
"spec": {
"data": {
"basic.auth.user.info": "admin:admin",
"basic.auth.credentials.source": "USER_INFO"
}
}
}
The Secret is created via the REST API:
curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/secrets -d@/<path-to>/sr-secret.json
Map the secret to the catalog’s connectionSecretId:
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "EnvironmentSecretMapping",
"metadata": {
"name": "sr-conn-secret-id"
},
"spec": {
"secretName": "sr-secret"
}
}
The mapping is created for an environment env-1 with the following REST request:
curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/environments/env-1/secret-mappings \
-d@/<path-to>/sr-secret-mapping.json
With this mapping, statements created in environment env-1 will use the following properties
to configure the Schema Registry client:
// from the plain "connectionConfig"
"schema.registry.url": "http://schemaregistry:8081",
// from the "sr-secret"
"basic.auth.user.info": "admin:admin",
"basic.auth.credentials.source": "USER_INFO"
Create a database
A database references a Kafka cluster and must be created within an existing catalog. Each database contains tables that correspond to topics in the referenced Kafka cluster. The catalog’s Schema Registry instance is used to derive table schemas from topic schemas.
To create a database, you configure it with a resource definition that contains the connection properties for the Kafka client. You may also need to create Secrets and EnvironmentSecretMappings to provide credentials.
Configure Kafka cluster
A database is configured with connection properties for the Kafka client. These properties are used to read data from and write data to topics during query execution.
A database is configured with the following resource definition:
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "KafkaDatabase",
"metadata": {
"name": "kafka-db",
},
"spec": {
"connectionConfig": {
"bootstrap.servers": "kafka-1:9092"
},
"connectionSecretId": "kafka-1-secret-id"
}
}
The database references the catalog it’s created under when making the create database API call. The connectionConfig contains the bootstrap servers for the Kafka cluster, and the connectionSecretId is used to map environment-specific secrets.
Configure database credentials
Sensitive connection properties for the database, such as access credentials, should be stored in Secrets and mapped to the database’s connectionSecretId using EnvironmentSecretMapping.
Create a secret
First, create a Secret for the database:
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "Secret",
"metadata": {
"name": "kafka-1-secret"
},
"spec": {
"data": {
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_PLAINTEXT",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"test\" password=\"testPw\";"
}
}
}
The Secret is created via the REST API:
curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/secrets -d@/<path-to>/secret.json
Map a secret to a connectionSecretId
Map the secret to the database’s connectionSecretId in an environment:
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "EnvironmentSecretMapping",
"metadata": {
"name": "kafka-1-secret-id"
},
"spec": {
"secretName": "kafka-1-secret"
}
}
The name of the resource (in this example, kafka-1-secret-id) is identical to the connectionSecretId specified in the
database definition. The secretName, kafka-1-secret is identical to the name of the Secret.
The mapping is created for an environment env-1 with the following REST request:
curl -v -H "Content-Type: application/json" \
-X POST http://cmf:8080/cmf/api/v1/environments/env-1/secret-mappings \
-d@/<path-to>/mapping.json
With this mapping, statements created in environment env-1 will use the following properties
to configure the Kafka clients when accessing topics in the database:
// from the plain "connectionConfig"
"bootstrap.servers": "kafka-1:9092",
// from the "kafka-1-secret"
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_PLAINTEXT",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"test\" password=\"testPw\";"
Update a catalog
You can update a catalog’s configuration by submitting a PUT request with the updated specification. For example, you can update the Schema Registry connection configuration or connection secret ID.
Update a catalog with the REST API
curl -v -H "Content-Type: application/json" \
-X PUT "http://cmf:8080/cmf/api/v1/catalogs/kafka/kafka-cat" \
-d@/<path-to>/updated-catalog.json
Update a database
You can update a database’s configuration by submitting a PUT request with the updated specification. For example, you can update the Kafka cluster connection configuration or connection secret ID.
Update a database with the REST API
curl -v -H "Content-Type: application/json" \
-X PUT "http://cmf:8080/cmf/api/v1/catalogs/kafka/kafka-cat/databases/kafka-db"
-d@/<path-to>/updated-database.json
Customize tables with ALTER TABLE
You can use ALTER TABLE statements to customize how a Kafka topic is exposed as a table in CMF.
This allows you to add computed columns, modify watermark strategies, and add table options.
Add a computed column
Add a computed column to a table that derives a value from existing columns:
> ALTER TABLE orders ADD total_value AS quantity * price;
Modify the watermark strategy
Modify the watermark for time-based processing:
> ALTER TABLE clicks MODIFY WATERMARK FOR timestamp_column AS timestamp_column - INTERVAL '5' SECOND;
Examples with the Confluent CLI
Create an ALTER TABLE statement using the CLI:
confluent --environment env-1 --compute-pool pool flink statement create alter-orders \
--catalog kafka-cat --database kafka-db \
--sql "ALTER TABLE orders ADD computed_total AS quantity * price;"
Important
ALTER TABLE statements are supported with limited functionality. The statement persists table customizations
but the underlying Kafka topics remain unchanged. These customizations apply to all queries that reference the table.
Delete a catalog
A catalog can be deleted via the Confluent CLI or the REST API.
Important
Only catalogs without databases can be deleted. Before deleting a catalog, you must delete all databases associated with it.
Delete a catalog with the Confluent CLI
confluent flink catalog delete kafka-cat
Delete a catalog with the REST API
curl -v -H "Content-Type: application/json" \
-X DELETE http://cmf:8080/cmf/api/v1/catalogs/kafka/kafka-cat
Delete a database
Important
Deleting a database will delete all table metadata associated with it. The actual topics in the Kafka cluster are not deleted.
Delete a database with the REST API
curl -v -H "Content-Type: application/json" \
-X DELETE http://cmf:8080/cmf/api/v1/catalogs/kafka/kafka-cat/databases/kafka-db
Limitations
CMF 2.0 does not support any catalog other than the built-in KafkaCatalog.
An exception is the example catalog enabled with the cmf.sql.examples-catalog.enabled configuration flag.
The following limitations apply for the KafkaCatalog in CMF 2.0:
Tables are exposed using the default inference mechanism. You can use
ALTER TABLEstatements to customize how a topic is exposed as a table (for example, adding computed columns or modifying watermarks), butCREATE TABLEandDROP TABLEstatements are not supported.The catalog uses a default mechanism to translate topic and schema metadata into Flink table and connector metadata. This is the same mechanism that Confluent Cloud Flink SQL uses for inferred tables.
The catalog uses the
TopicNameStrategyto retrieve the key and value schemas of a topic. For a topic calledorders, the catalog looks for two subjects calledorders-keyandorders-value. If these subjects are not present, the key or value schemas are read as raw bytes and exposed as single columns of typeBINARY.Compacted Kafka topics are not exposed as tables.