Key Search with Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® supports read-only external tables to enable key search with federated query execution on external data source, like JDBC, REST, and MongoDB.
Note
Key Search is an Open Preview feature in Confluent Cloud.
A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.
Use the KEY_SEARCH_AGG function to run key searches over external databases in Confluent Cloud for Apache Flink.
The output of KEY_SEARCH_AGG is an array with all rows in the external table that have a matching key in the search column.
Key search supports only a single column.
Confluent Cloud for Apache Flink accepts any JSON return type from the REST server and automatically parses it, whether it’s a single JSON node or an array of JSON nodes.
Syntax¶
SELECT * FROM key_input,
LATERAL TABLE(KEY_SEARCH_AGG(<external_table>, DESCRIPTOR(<input_column>), <search_column>))
Supported providers¶
These key search providers are supported:
- Confluent-jdbc: Postgres, MySQL, SQL Server, Oracle
- REST
- MongoDB
- Couchbase
Search table parameters¶
Common parameters¶
{CONNECTOR}¶
Provider name.
{CONNECTOR}.PARAMS.*¶
Parameters supported by external providers.
Confluent-jdbc properties¶
confluent-jdbc.connection¶
Confluent JDBC connection that includes the endpoint, username, and password.
This property is required.
REST properties¶
confluent.search.rest.server.error.code.retry.list¶
A comma-separated list of HTTP status codes that should be retried when
received. The default is 429,503
.
This property is optional.
rest.connection¶
REST connection that includes the endpoint and credentials required to connect to the endpoint.
The REST endpoint must use the HTTPS protocol, can’t be an IP address, and can’t
end with confluent.cloud
as the domain name.
REST authorization uses standard authorization, with the
<auth-scheme> <authorization-parameters>
header. The header can’t be
customized.
This property is required.
rest.path¶
REST path segments under the base endpoint in the connection for issuing HTTP requests toward the endpoint.
The path supports a key placeholder in the segment, for example:
rest.path = "search/product/product_id={product_id}"
In this example, the placeholder is {product_id}
. If the input column
name matches the placeholder, Confluent Cloud for Apache Flink replaces the placeholder with the
column’s value for each row and performs the key search.
This property is required.
MongoDB properties¶
mongodb.connection¶
MongoDB connection that includes the endpoint, username, and passwords.
This property is required.
Examples¶
Key search with Postgres¶
This example shows how to run key search with Postgres by using Flink SQL.
This example shows the following steps:
- Create a connection resource to a Postgres database.
- Create a Confluent-jdbc external table.
- Create an input text table.
- Run a key search on movie titles.
- Show the results and store them in a new table.
Run the following command to create a connection resource named jdbc-postgres-connection that uses your Postgres credentials.
confluent flink connection create jdbc-postgres-connection \ --cloud AWS \ --region us-west-2 \ --type confluent_jdbc \ --endpoint ${jdbc_url}\ --username ${username} \ --password ${password} \ --environment ${ENV_ID}
Create a Postgres table with preset example data like Netflix shows. Confluent-jdbc also supports mysql, sql-server, and oracle.
-- create jdbc table CREATE TABLE jdbc_postgres ( show_id STRING, type STRING, title STRING, cast_members STRING, country STRING, date_added DATE, release_year INT, rating STRING, duration STRING, listed_in STRING, description STRING ) WITH ( 'connector' = 'confluent-jdbc', 'confluent-jdbc.connection' = 'jdbc-postgres-connection', 'confluent-jdbc.table-name' = 'netflix_shows' );
Run the following statements to create an input table and insert data.
-- create input table CREATE TABLE jdbc_postgres_input_table ( title STRING ); INSERT INTO jdbc_postgres_input_table VALUES ('Squid Game'), ('The Irishman'), ('Stranger Things');
Run the following statement to run key search on movie title.
---- key search SELECT * FROM jdbc_postgres_input_table, LATERAL TABLE(KEY_SEARCH_AGG(jdbc_postgres, DESCRIPTOR(title), title));
Run the following statement to store search results in a new table.
-- store search results in a new table CREATE TABLE jdbc_postgres_result_table AS SELECT * FROM jdbc_postgres_input_table, LATERAL TABLE(KEY_SEARCH_AGG(jdbc_postgres, DESCRIPTOR(title), title)); SELECT * FROM jdbc_postgres_result_table CROSS JOIN UNNEST(search_results) AS T(id, text_column, integer_column, float_column, double_column, numeric_column, boolean_column, timestamp_column)
Key search with REST¶
This example shows how to run key search with any RESTful server by using Flink SQL. It assumes the RESTful server’s response format is in JSON.
This example shows the following steps:
- Create a connection resource to a RESTful server.
- Create a REST external table.
- Create a text input table.
- Run a key search on book titles.
- Show the results and store them in a new table.
Run the following command to create a connection resource named rest-connection that encapsulates the endpoint and the credentials that will be used to access the RESTful server.
confluent flink connection create rest-connection \ --cloud AZURE \ --region westus2 \ --type rest \ --endpoint ${endpoint} \ --username ${username} \ --password ${password} \ --environment ${ENV_ID}
Run the following statement to create a REST external table.
-- REST key search CREATE TABLE rest_books_key_search ( numFound BIGINT, q STRING NOT NULL, documentation_url STRING, docs ARRAY<ROW<`author_name` ARRAY<STRING>, `title` STRING>>, PRIMARY KEY (q) NOT ENFORCED ) WITH ( 'connector' = 'rest', 'rest.connection' = 'rest-connection', 'rest.method' = 'GET', 'rest.path' = 'search.json' );
Run the following statement to create a text input table.
-- create search table CREATE TABLE bookstore ( bookname STRING NOT NULL, price INT, stock INT );
Run the following statement to insert data into the table.
INSERT INTO `bookstore` VALUES ('harry-potter', 20, 500), ('wish', 10, 1000), ('crucial conversation', 10, 2000)
Run the following statement to run key search on book titles.
-- REST key search SELECT * FROM bookstore, LATERAL TABLE (KEY_SEARCH_AGG(rest_books_key_search, DESCRIPTOR(q), bookname))
Run the following statement to store search results in a new table.
-- store search results in a new table CREATE TABLE books_key_search_results AS SELECT * FROM bookstore, LATERAL TABLE (KEY_SEARCH_AGG(rest_books_key_search, DESCRIPTOR(q), bookname))
Run the following statement to show the results.
SELECT * FROM books_key_search_results CROSS JOIN UNNEST(search_results) AS T(numFound, q, documentation_url, docs)
Key search with MongoDB¶
This example shows how to run a key search with MongoDB by using Flink SQL.
This example shows the following steps:
- Create a connection resource to a MongoDB database.
- Create a MongoDB external table.
- Create a text input table.
- Run a key search on movie titles.
- Show the results and store them in a new table.
It assumes an Atlas cluster as shown in Create a Cluster and MongoDB sample dataset.
Run the following command to create a connection resource named mongodb-connection that uses your MongoDB credentials.
confluent flink connection create mongodb-connection \ --cloud AZURE \ --region westus2 \ --type mongodb \ --endpoint ${atlas_endpoint} \ --username ${atlas_username} \ --password ${atlas_password} \ --environment ${ENV_ID}
Run the following statement to create a MongoDB external table.
-- mongodb key search CREATE TABLE mongodb_movies_key_search ( title STRING, plot STRING ) WITH ( 'connector' = 'mongodb', 'mongodb.connection' = 'mongodb-connection', 'mongodb.database' = 'sample_mflix', 'mongodb.collection' = 'movies', 'mongodb.index' = 'default' );
Run the following statement to create a search table.
-- create search table CREATE TABLE movies_key_search( title STRING, year INT );
Run the following statement to insert data into the search table.
INSERT INTO movies_key_search VALUES ('golden finger', 1964), ('license to kill', 1989), ('skyfall', 2012), ('die hard', 1988)
Run the following statement to run a key search on movie titles.
-- mongodb key search SELECT * FROM movies_key_search, LATERAL TABLE(KEY_SEARCH_AGG(`mongodb_movies_key_search`, descriptor(title), title))
Run the following statement to store search results in a new table.
-- store search results in a new table CREATE TABLE movies_key_search_results AS SELECT * FROM movies_key_search, LATERAL TABLE(KEY_SEARCH_AGG(`mongodb_movies_key_search`, descriptor(title), title))
Run the following statement to show the results.
SELECT * FROM movies_key_search_results CROSS JOIN UNNEST(search_results) AS T(title, plot)