Key Search with Confluent Cloud for Apache Flink
Confluent Cloud for Apache Flink® supports read-only external tables to enable key search with federated query execution on external data source, like JDBC, REST, and MongoDB.
Note
Key Search is an Open Preview feature in Confluent Cloud.
A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.
Use the KEY_SEARCH_AGG function to run key searches over external databases in Confluent Cloud for Apache Flink.
The output of KEY_SEARCH_AGG is an array with all rows in the external table that have a matching key in the search column.
Key search supports only a single column.
Confluent Cloud for Apache Flink accepts any JSON return type from the REST server and automatically parses it, whether it’s a single JSON node or an array of JSON nodes.
Syntax
SELECT * FROM key_input,
LATERAL TABLE(KEY_SEARCH_AGG(<external_table>, descriptor(<input_column>), <search_column>))
-- map settings are optional
SELECT * FROM key_input,
LATERAL TABLE(KEY_SEARCH_AGG(<external_table>, descriptor(<input_column>), <search_column>, map['async_enabled', [boolean], 'client_timeout', [int], 'max_parallelism', [int], 'retry_count', [int]]));
Supported providers
These key search providers are supported. Set the connector property in the CREATE TABLE statement to specify the provider for the external database table.
Provider |
connector value |
|---|---|
Confluent-jdbc (Postgres, MySQL, SQL Server, Oracle) |
|
Couchbase |
|
MongoDB |
|
REST |
|
For more information, see Key Search with External Databases.
Search table parameters
Common parameters
{CONNECTOR}
Provider name.
{CONNECTOR}.PARAMS.*
Parameters supported by external providers.
Confluent-jdbc properties
confluent-jdbc.connection
Confluent JDBC connection that includes the endpoint, username, and password.
This property is required.
confluent-jdbc.table-name
The name of JDBC table to connect.
This property is required.
REST properties
confluent.search.rest.server.error.code.retry.list
A comma-separated list of HTTP status codes that should be retried when
received. The default is 429,503.
This property is optional.
rest.connection
REST connection that includes the endpoint and credentials required to connect to the endpoint.
The REST endpoint must use the HTTPS protocol, can’t be an IP address, and can’t
end with confluent.cloud as the domain name.
REST authorization uses standard authorization, with the
<auth-scheme> <authorization-parameters> header. The header can’t be
customized.
This property is required.
rest.path
REST path segments under the base endpoint in the connection for issuing HTTP requests toward the endpoint.
The path supports a key placeholder in the segment, for example:
rest.path = "search/product/product_id={product_id}"
In this example, the placeholder is {product_id}. If the input column
name matches the placeholder, Confluent Cloud for Apache Flink replaces the placeholder with the
column’s value for each row and performs the key search.
This property is required.
rest.method
HTTP verb that will be used to invoke the RESTful endpoint
This property is required.
MongoDB properties
mongodb.connection
MongoDB connection that includes the endpoint, username, and passwords.
This property is required.
mongodb.database
MongoDB database name.
This property is required.
mongodb.collection
MongoDB collection name.
This property is required.
mongodb.index
MongoDB search index name.
This property is required.
Couchbase properties
couchbase.connection
Couchbase connection that includes the endpoint, username, and password.
This property is required.
couchbase.bucket
Couchbase bucket name.
This property is required.
couchbase.scope
Couchbase scope name.
This property is required.
couchbase.collection
Couchbase collection name.
This property is required.
couchbase.index
Couchbase search index name.
This property is required.
Examples
Key search with Postgres
This example shows how to run key search with Postgres by using Flink SQL.
This example shows the following steps:
Create a connection resource to a Postgres database.
Create a Confluent-jdbc external table.
Create an input text table.
Run a key search on movie titles.
Show the results and store them in a new table.
Run the following command to create a connection resource named “jdbc_postgres_connection” that uses your Postgres credentials.
CREATE CONNECTION jdbc_postgres_connection WITH ( 'type' = 'confluent_jdbc', 'endpoint' = '<jdbc_url>', 'username' = '<username>', 'password' = '<password>', 'environment' = '<ENV_ID>' );
Create a Postgres table with preset example data like Netflix shows. Confluent-jdbc also supports mysql, sql-server, and oracle.
-- create jdbc table CREATE TABLE jdbc_postgres ( show_id STRING, type STRING, title STRING, cast_members STRING, country STRING, date_added DATE, release_year INT, rating STRING, duration STRING, listed_in STRING, description STRING ) WITH ( 'connector' = 'confluent-jdbc', 'confluent-jdbc.connection' = 'jdbc_postgres_connection', 'confluent-jdbc.table-name' = 'netflix_shows' );
Run the following statements to create an input table and insert data.
-- create input table CREATE TABLE jdbc_postgres_input_table ( title STRING ); INSERT INTO jdbc_postgres_input_table VALUES ('Squid Game'), ('The Irishman'), ('Stranger Things');
Run the following statement to run key search on movie title.
---- key search SELECT * FROM jdbc_postgres_input_table, LATERAL TABLE(KEY_SEARCH_AGG(jdbc_postgres, DESCRIPTOR(title), title));
Run the following statement to store search results in a new table.
-- store search results in a new table CREATE TABLE jdbc_postgres_result_table AS SELECT * FROM jdbc_postgres_input_table, LATERAL TABLE(KEY_SEARCH_AGG(jdbc_postgres, DESCRIPTOR(title), title)); SELECT * FROM jdbc_postgres_result_table CROSS JOIN UNNEST(search_results) AS T(id, text_column, integer_column, float_column, double_column, numeric_column, boolean_column, timestamp_column)
Key search with REST
This example shows how to run key search with any RESTful server by using Flink SQL. It assumes the RESTful server’s response format is in JSON.
This example shows the following steps:
Create a connection resource to a RESTful server.
Create a REST external table.
Create a text input table.
Run a key search on book titles.
Show the results and store them in a new table.
Run the following command to create a connection resource named “rest_connection” that encapsulates the endpoint and the credentials that will be used to access the RESTful server.
CREATE CONNECTION rest_connection WITH ( 'type' = 'rest', 'endpoint' = '<endpoint>', 'username' = '<username>', 'password' = '<password>' );
Run the following statement to create a REST external table.
-- REST key search CREATE TABLE rest_books_key_search ( numFound BIGINT, q STRING NOT NULL, documentation_url STRING, docs ARRAY<ROW<`author_name` ARRAY<STRING>, `title` STRING>>, PRIMARY KEY (q) NOT ENFORCED ) WITH ( 'connector' = 'rest', 'rest.connection' = 'rest_connection', 'rest.method' = 'GET', 'rest.path' = 'search.json' );
Run the following statement to create a text input table.
-- create search table CREATE TABLE bookstore ( bookname STRING NOT NULL, price INT, stock INT );
Run the following statement to insert data into the table.
INSERT INTO `bookstore` VALUES ('harry-potter', 20, 500), ('wish', 10, 1000), ('crucial conversation', 10, 2000)
Run the following statement to run key search on book titles.
-- REST key search SELECT * FROM bookstore, LATERAL TABLE (KEY_SEARCH_AGG(rest_books_key_search, DESCRIPTOR(q), bookname))
Run the following statement to store search results in a new table.
-- store search results in a new table CREATE TABLE books_key_search_results AS SELECT * FROM bookstore, LATERAL TABLE (KEY_SEARCH_AGG(rest_books_key_search, DESCRIPTOR(q), bookname))
Run the following statement to show the results.
SELECT * FROM books_key_search_results CROSS JOIN UNNEST(search_results) AS T(numFound, q, documentation_url, docs)
Key search with MongoDB
This example shows how to run a key search with MongoDB by using Flink SQL.
This example shows the following steps:
Create a connection resource to a MongoDB database.
Create a MongoDB external table.
Create a text input table.
Run a key search on movie titles.
Show the results and store them in a new table.
It assumes an Atlas cluster as shown in Create a Cluster and MongoDB sample dataset.
Run the following command to create a connection resource named “mongodb_connection” that uses your MongoDB credentials.
CREATE CONNECTION mongodb_connection WITH ( 'type' = 'mongodb', 'endpoint' = '<atlas_endpoint>', 'username' = '<atlas_username>', 'password' = '<atlas_password>' );
Run the following statement to create a MongoDB external table.
-- mongodb key search CREATE TABLE mongodb_movies_key_search ( title STRING, plot STRING ) WITH ( 'connector' = 'mongodb', 'mongodb.connection' = 'mongodb_connection', 'mongodb.database' = 'sample_mflix', 'mongodb.collection' = 'movies', 'mongodb.index' = 'default' );
Run the following statement to create a search table.
-- create search table CREATE TABLE movies_key_search( title STRING, year INT );
Run the following statement to insert data into the search table.
INSERT INTO movies_key_search VALUES ('golden finger', 1964), ('license to kill', 1989), ('skyfall', 2012), ('die hard', 1988)
Run the following statement to run a key search on movie titles.
-- mongodb key search SELECT * FROM movies_key_search, LATERAL TABLE(KEY_SEARCH_AGG(`mongodb_movies_key_search`, descriptor(title), title))
Run the following statement to store search results in a new table.
-- store search results in a new table CREATE TABLE movies_key_search_results AS SELECT * FROM movies_key_search, LATERAL TABLE(KEY_SEARCH_AGG(`mongodb_movies_key_search`, descriptor(title), title))
Run the following statement to show the results.
SELECT * FROM movies_key_search_results CROSS JOIN UNNEST(search_results) AS T(title, plot)