Text Search with Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® supports read-only external tables to enable full-text search over external databases, like MongoDB, Couchbase, and Elasticsearch.
Note
Text Search is an Open Preview feature in Confluent Cloud.
A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.
Use the TEXT_SEARCH_AGG function to run text searches over external databases in Confluent Cloud for Apache Flink.
The output of TEXT_SEARCH_AGG is an array with all rows in the external table that have matching text in the search column.
Syntax¶
SELECT * FROM text_input,
LATERAL TABLE(TEXT_SEARCH_AGG(<external_table>, DESCRIPTOR(<input_column>), <search_column>, <LIMIT>))
Supported providers¶
- MongoDB
- Couchbase
- Elasticsearch
Search table parameters¶
Common parameters¶
{CONNECTOR}¶
Provider name.
{CONNECTOR}.PARAMS.*¶
Parameters supported by external providers.
MongoDB properties¶
mongodb.connection¶
MongoDB connection that includes the endpoint, username, and passwords.
This property is required.
Couchbase properties¶
couchbase.connection¶
Couchbase connection that includes the endpoint, username, and password.
This property is required.
Elasticsearch properties¶
Elasticsearch does not support username and password credentials.
elastic.connection¶
Elastic connection that includes the endpoint and the api key. For more information, see Manage Connections.
This property is required.
Examples¶
Full text search with MongoDB¶
This example shows how to run text search with MongoDB by using Flink SQL.
This example shows the following steps:
- Create a connection resource to a MongoDB database.
- Create a MongoDB external table.
- Create a text input table.
- Text search on movie plots.
- Show the results and store them in a new table.
It assumes an Atlas cluster as shown in Create a Cluster and MongoDB sample dataset.
Run the following command to create a connection resource named mongodb-connection that uses your MongoDB credentials.
confluent flink connection create mongodb-connection \ --cloud AZURE \ --region westus2 \ --type mongodb \ --endpoint ${atlas_endpoint} \ --username ${atlas_username} \ --password ${atlas_password} \ --environment ${ENV_ID}
Create a MongoDB external table.
-- mongodb full text search CREATE TABLE mongodb_movies_full_text_search ( title STRING, plot STRING ) WITH ( 'connector' = 'mongodb', 'mongodb.connection' = 'mongodb-connection', 'mongodb.database' = 'sample_mflix', 'mongodb.collection' = 'movies', 'mongodb.index' = 'default' );
Create a search table.
-- create search table CREATE TABLE movies_full_text_search( plot STRING );
Insert data into the search table.
INSERT INTO movies_full_text_search VALUES ('A woman, with the aid of her police officer sweetheart'), ('A District Attorney''s outspoken stand on abortion gets him in trouble'), ('A tipsy doctor encounters his patient sleepwalking on a building ledge');
Run full text search.
-- full text search SELECT * FROM movies_full_text_search, LATERAL TABLE(TEXT_SEARCH_AGG(`mongodb_movies_full_text_search`, DESCRIPTOR(plot), plot, 3));
Store search results in a new table.
-- store search results in a new table CREATE TABLE movies_full_text_search_results AS SELECT * FROM movies_full_text_search, LATERAL TABLE(TEXT_SEARCH_AGG(`mongodb_movies_full_text_search`, DESCRIPTOR(plot), plot, 3)); SELECT * FROM movies_full_text_search_results CROSS JOIN UNNEST(search_results) AS T(title, plot);
Full text search with Couchbase¶
This example shows the following steps:
- Create a connection resource to a Couchbase database.
- Create a Couchbase external table.
- Create a text input table.
- Full text search on color descriptions.
- Select the results or stored in a new table.
Import a Couchbase example dataset by following the steps in Run a Vector Search with a Couchbase SDK.
Create a new search index to include all fields in the dataset and enable full text search.
Run the following command to create a connection resource named couchbase-connection that uses your Couchbase credentials.
confluent flink connection create couchbase-connection \ --cloud AZURE \ --region westus2 \ --type couchbase \ --endpoint ${couchbase_endpoint} \ --username ${couchbase_username} \ --password ${couchbase_password} \ --environment ${ENV_ID}
Create a Couchbase external table.
-- couchbase full text search CREATE TABLE couchbase_color_full_text_search ( color STRING, brightness DOUBLE, description STRING ) WITH ( 'connector' = 'couchbase', 'couchbase.connection' = 'couchbase-connection', 'couchbase.bucket' = 'color-vector-sample', 'couchbase.scope' = 'color', 'couchbase.collection' = 'rgb', 'couchbase.index' = 'rgb-vector' );
Create a text input table.
CREATE TABLE text_input_couchbase_full_text_search (input STRING);
Insert data into the input table.
INSERT INTO text_input_couchbase_full_text_search VALUES ('associated with growth nature and positivity'), ('a mix of blue and green'), ('the color of the sky on a clear summer day');
Run full text search.
SELECT * FROM text_input_couchbase_full_text_search, LATERAL TABLE(TEXT_SEARCH_AGG(couchbase_color_full_text_search, DESCRIPTOR(description), input, 3));
Store search results in a new table.
-- store search results in a new table CREATE TABLE text_input_couchbase_results AS SELECT * FROM text_input_couchbase_full_text_search, LATERAL TABLE(TEXT_SEARCH_AGG(couchbase_color_full_text_search, DESCRIPTOR(description), input, 3)); SELECT * FROM text_input_couchbase_results CROSS JOIN UNNEST(search_results) AS T(color, brightness, description);