Text Search with Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® supports read-only external tables to enable full-text search over external databases, like MongoDB, Couchbase, and Elasticsearch.

Note

Text Search is an Open Preview feature in Confluent Cloud.

A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.

Use the TEXT_SEARCH_AGG function to run text searches over external databases in Confluent Cloud for Apache Flink.

The output of TEXT_SEARCH_AGG is an array with all rows in the external table that have matching text in the search column.

Syntax

SELECT * FROM text_input,
  LATERAL TABLE(TEXT_SEARCH_AGG(<external_table>, DESCRIPTOR(<input_column>), <search_column>, <LIMIT>))

Supported providers

  • MongoDB
  • Couchbase
  • Elasticsearch

Search table parameters

Common parameters

{CONNECTOR}

Provider name.

{CONNECTOR}.PARAMS.*

Parameters supported by external providers.

MongoDB properties

mongodb.connection

MongoDB connection that includes the endpoint, username, and passwords.

This property is required.

mongodb.database

MongoDB database name.

This property is required.

mongodb.collection

MongoDB collection name.

This property is required.

mongodb.index

MongoDB full text search index name.

This property is required.

Couchbase properties

couchbase.connection

Couchbase connection that includes the endpoint, username, and password.

This property is required.

couchbase.bucket

Couchbase bucket name.

This property is required.

couchbase.scope

Couchbase scope name.

This property is required.

couchbase.collection

Couchbase collection name.

This property is required.

couchbase.index

Couchbase search index name.

This property is required.

Elasticsearch properties

Elasticsearch does not support username and password credentials.

elastic.connection

Elastic connection that includes the endpoint and the api key. For more information, see Manage Connections.

This property is required.

elastic.index

The search index for the Elastic Search.

This property is required.

Examples

Full text search with MongoDB

This example shows how to run text search with MongoDB by using Flink SQL.

This example shows the following steps:

  • Create a connection resource to a MongoDB database.
  • Create a MongoDB external table.
  • Create a text input table.
  • Text search on movie plots.
  • Show the results and store them in a new table.

It assumes an Atlas cluster as shown in Create a Cluster and MongoDB sample dataset.

  1. Run the following command to create a connection resource named mongodb-connection that uses your MongoDB credentials.

    confluent flink connection create mongodb-connection \
      --cloud AZURE \
      --region westus2 \
      --type mongodb \
      --endpoint ${atlas_endpoint} \
      --username ${atlas_username} \
      --password ${atlas_password} \
      --environment ${ENV_ID}
    
  2. Create a MongoDB external table.

    -- mongodb full text search
    CREATE TABLE mongodb_movies_full_text_search (
        title STRING,
        plot STRING
    ) WITH (
        'connector' = 'mongodb',
        'mongodb.connection' = 'mongodb-connection',
        'mongodb.database' = 'sample_mflix',
        'mongodb.collection' = 'movies',
        'mongodb.index' = 'default'
    );
    
  3. Create a search table.

    -- create search table
    CREATE TABLE movies_full_text_search(
      plot STRING
    );
    
  4. Insert data into the search table.

    INSERT INTO movies_full_text_search VALUES
      ('A woman, with the aid of her police officer sweetheart'),
      ('A District Attorney''s outspoken stand on abortion gets him in trouble'),
      ('A tipsy doctor encounters his patient sleepwalking on a building ledge');
    
  5. Run full text search.

    -- full text search
    SELECT * FROM movies_full_text_search,
      LATERAL TABLE(TEXT_SEARCH_AGG(`mongodb_movies_full_text_search`, DESCRIPTOR(plot), plot, 3));
    
  6. Store search results in a new table.

    -- store search results in a new table
    CREATE TABLE movies_full_text_search_results AS
      SELECT * FROM movies_full_text_search,
        LATERAL TABLE(TEXT_SEARCH_AGG(`mongodb_movies_full_text_search`, DESCRIPTOR(plot), plot, 3));
    
    SELECT * FROM movies_full_text_search_results
      CROSS JOIN UNNEST(search_results)
      AS T(title, plot);
    

Full text search with Couchbase

This example shows the following steps:

  • Create a connection resource to a Couchbase database.
  • Create a Couchbase external table.
  • Create a text input table.
  • Full text search on color descriptions.
  • Select the results or stored in a new table.

Import a Couchbase example dataset by following the steps in Run a Vector Search with a Couchbase SDK.

Create a new search index to include all fields in the dataset and enable full text search.

  1. Run the following command to create a connection resource named couchbase-connection that uses your Couchbase credentials.

    confluent flink connection create couchbase-connection \
      --cloud AZURE \
      --region westus2 \
      --type couchbase \
      --endpoint ${couchbase_endpoint} \
      --username ${couchbase_username} \
      --password ${couchbase_password} \
      --environment ${ENV_ID}
    
  2. Create a Couchbase external table.

    -- couchbase full text search
    CREATE TABLE couchbase_color_full_text_search (
      color STRING,
      brightness DOUBLE,
      description STRING
    ) WITH (
      'connector' = 'couchbase',
      'couchbase.connection' = 'couchbase-connection',
      'couchbase.bucket' = 'color-vector-sample',
      'couchbase.scope' = 'color',
      'couchbase.collection' = 'rgb',
      'couchbase.index' = 'rgb-vector'
    );
    
  3. Create a text input table.

    CREATE TABLE text_input_couchbase_full_text_search (input STRING);
    
  4. Insert data into the input table.

    INSERT INTO text_input_couchbase_full_text_search VALUES
      ('associated with growth nature and positivity'),
      ('a mix of blue and green'),
      ('the color of the sky on a clear summer day');
    
  5. Run full text search.

    SELECT * FROM text_input_couchbase_full_text_search,
      LATERAL TABLE(TEXT_SEARCH_AGG(couchbase_color_full_text_search, DESCRIPTOR(description), input, 3));
    
  6. Store search results in a new table.

    -- store search results in a new table
    CREATE TABLE text_input_couchbase_results AS
      SELECT * FROM text_input_couchbase_full_text_search,
        LATERAL TABLE(TEXT_SEARCH_AGG(couchbase_color_full_text_search, DESCRIPTOR(description), input, 3));
    
    SELECT * FROM text_input_couchbase_results
      CROSS JOIN UNNEST(search_results)
      AS T(color, brightness, description);