Key Search with Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® supports read-only external tables to enable key search with federated query execution on external data sources, like JDBC, REST, and MongoDB.

Note

Egress and ingress data transfers from Flink are offered at no cost until 3/31/2026.

Use the KEY_SEARCH_AGG function to run key searches over external databases in Confluent Cloud for Apache Flink.

The output of KEY_SEARCH_AGG is an array with all rows in the external table that have a matching key in the search column.

Key search supports only a single column.

Confluent Cloud for Apache Flink accepts any JSON return type from the REST server and automatically parses it, whether it’s a single JSON node or an array of JSON nodes.

Syntax

SELECT * FROM key_input,
  LATERAL TABLE(KEY_SEARCH_AGG(<external_table>, descriptor(<input_column>), <search_column>))

-- map settings are optional
SELECT * FROM key_input,
  LATERAL TABLE(KEY_SEARCH_AGG(<external_table>, descriptor(<input_column>), <search_column>,
    map['async_enabled', [boolean], 'client_timeout', [int], 'max_parallelism', [int], 'retry_count', [int], 'retry_error_list', [string]]));

Map options

The KEY_SEARCH_AGG function supports several configurable options by using a map parameter. Each option controls a specific aspect of the key search operation.

Configuration

You can control how calls to the external table execute with these optional parameters:

  • async_enabled: Calls to external tables are asynchronous and don’t block. The default is true.

  • client_timeout: Time, in seconds, after which the request to the external table endpoint times out. The default is 30 seconds.

  • debug: Return a detailed stack trace in the API response. The default is false. Confluent Cloud for Apache Flink implements data masking for error messages to remove any secrets or customer input, but the stack trace may contain the prompt itself or some part of the response string.

  • max_parallelism: Maximum number of parallel requests that the function can make. Can be used only when async_enabled is true. The default is 10.

  • retry_count: Maximum number of times the external table request is retried if the request to the external table fails. The default is 3.

  • retry_error_list: Comma-separated list of error codes that trigger a retry when the external table request fails. Only errors matching entries in this list cause the operation to be retried.

Supported providers

These key search providers are supported. Set the connector property in the CREATE TABLE statement to specify the provider for the external database table.

Provider connector value

Confluent JDBC (currently supports Postgres, MySQL, SQL Server, Oracle)

confluent-jdbc

Couchbase

couchbase

MongoDB

mongodb

REST (supports any REST endpoint that uses JSON format)

rest

For more information, see Key Search with External Databases.

Search table parameters

Common parameters

{CONNECTOR}

Provider name.

{CONNECTOR}.PARAMS.*

Parameters supported by external providers.

confluent-jdbc properties

confluent-jdbc.connection

Confluent JDBC connection that includes the endpoint, username, and password.

This property is required.

confluent-jdbc.table-name

The name of JDBC table to connect.

This property is required.

REST properties

rest.connection

REST connection that includes the endpoint and credentials required to connect to the endpoint.

The REST endpoint must use the HTTPS protocol, can’t be an IP address, and can’t end with confluent.cloud as the domain name.

REST authorization uses standard authorization, with the <auth-scheme> <authorization-parameters> header. The header can’t be customized.

This property is required.

rest.endpoint_suffix

Add a suffix to the endpoint without any encoding.

This property is optional.

rest.headers

HTTP headers to be included in all REST requests. This property takes a JSON blob that has key/value pairs.

This property is optional.

rest.headers = '{"header1":"value","header2":"value"}'

rest.path

REST path segments under the base endpoint in the connection for issuing HTTP requests toward the endpoint.

The path supports a key placeholder in the segment, for example:

rest.path = "search/product/product_id={product_id}"

In this example, the placeholder is {product_id}. If the input column name matches the placeholder, Confluent Cloud for Apache Flink replaces the placeholder with the column’s value for each row and performs the key search.

This property is required.

rest.method

HTTP verb that will be used to invoke the RESTful endpoint

This property is required.

MongoDB properties

mongodb.connection

MongoDB connection that includes the endpoint, username, and passwords.

This property is required.

mongodb.database

MongoDB database name.

This property is required.

mongodb.collection

MongoDB collection name.

This property is required.

mongodb.index

MongoDB search index name.

This property is required.

Couchbase properties

couchbase.connection

Couchbase connection that includes the endpoint, username, and password.

This property is required.

couchbase.bucket

Couchbase bucket name.

This property is required.

couchbase.scope

Couchbase scope name.

This property is required.

couchbase.collection

Couchbase collection name.

This property is required.

couchbase.index

Couchbase search index name.

This property is required.

Examples

Key search with Postgres

This example shows how to run key search with Postgres by using Flink SQL.

This example shows the following steps:

  • Create a connection resource to a Postgres database.

  • Create a confluent-jdbc external table.

  • Create an input text table.

  • Run a key search on movie titles.

  • Show the results and store them in a new table.

  1. Run the following command to create a connection resource named “jdbc_postgres_connection” that uses your Postgres credentials.

    CREATE CONNECTION jdbc_postgres_connection
      WITH (
        'type' = 'confluent_jdbc',
        'endpoint' = '<jdbc_url>',
        'username' = '<username>',
        'password' = '<password>',
        'environment' = '<ENV_ID>'
      );
    
  2. Create a Postgres table with preset example data like Netflix shows. confluent-jdbc also supports mysql, sql-server, and oracle.

    -- create jdbc table
    CREATE TABLE jdbc_postgres (
        show_id STRING,
        type STRING,
        title STRING,
        cast_members STRING,
        country STRING,
        date_added DATE,
        release_year INT,
        rating STRING,
        duration STRING,
        listed_in STRING,
        description STRING
    ) WITH (
        'connector' = 'confluent-jdbc',
        'confluent-jdbc.connection' = 'jdbc_postgres_connection',
        'confluent-jdbc.table-name' = 'netflix_shows'
    );
    
  3. Run the following statements to create an input table and insert data.

    -- create input table
    CREATE TABLE jdbc_postgres_input_table (
        title STRING
    );
    
    INSERT INTO jdbc_postgres_input_table VALUES
      ('Squid Game'),
      ('The Irishman'),
      ('Stranger Things');
    
  4. Run the following statement to run key search on movie title.

    ---- key search
    SELECT * FROM jdbc_postgres_input_table,
      LATERAL TABLE(KEY_SEARCH_AGG(jdbc_postgres, DESCRIPTOR(title), title));
    
  5. Run the following statement to store search results in a new table.

    -- store search results in a new table
    CREATE TABLE jdbc_postgres_result_table AS
      SELECT * FROM jdbc_postgres_input_table,
        LATERAL TABLE(KEY_SEARCH_AGG(jdbc_postgres, DESCRIPTOR(title), title));
    
    SELECT * FROM jdbc_postgres_result_table
      CROSS JOIN UNNEST(search_results)
      AS T(id, text_column, integer_column, float_column, double_column, numeric_column, boolean_column, timestamp_column)
    

Key search with REST

This example shows how to run key search with any RESTful server by using Flink SQL. It assumes the RESTful server’s response format is in JSON.

This example shows the following steps:

  • Create a connection resource to a RESTful server.

  • Create a REST external table.

  • Create a text input table.

  • Run a key search on book titles.

  • Show the results and store them in a new table.

  1. Run the following command to create a connection resource named “rest_connection” that encapsulates the endpoint and the credentials that will be used to access the RESTful server.

    CREATE CONNECTION rest_connection
      WITH (
        'type' = 'rest',
        'endpoint' = '<endpoint>',
        'username' = '<username>',
        'password' = '<password>'
      );
    
  2. Run the following statement to create a REST external table.

    -- REST key search
    CREATE TABLE rest_books_key_search (
      numFound BIGINT,
      q STRING NOT NULL,
      documentation_url STRING,
      docs ARRAY<ROW<`author_name` ARRAY<STRING>, `title` STRING>>,
      PRIMARY KEY (q) NOT ENFORCED
    ) WITH (
      'connector' = 'rest',
      'rest.connection' = 'rest_connection',
      'rest.method' = 'GET',
      'rest.path' = 'search.json'
    );
    
  3. Run the following statement to create a text input table.

    -- create search table
    CREATE TABLE bookstore (
      bookname STRING NOT NULL,
      price INT,
      stock INT
    );
    
  4. Run the following statement to insert data into the table.

    INSERT INTO `bookstore` VALUES
      ('harry-potter', 20, 500),
      ('wish', 10, 1000),
      ('crucial conversation', 10, 2000)
    
  5. Run the following statement to run key search on book titles.

    -- REST key search
    SELECT * FROM bookstore,
      LATERAL TABLE (KEY_SEARCH_AGG(rest_books_key_search, DESCRIPTOR(q), bookname))
    
  6. Run the following statement to store search results in a new table.

    -- store search results in a new table
    CREATE TABLE books_key_search_results AS
      SELECT * FROM bookstore,
        LATERAL TABLE (KEY_SEARCH_AGG(rest_books_key_search, DESCRIPTOR(q), bookname))
    
  7. Run the following statement to show the results.

    SELECT * FROM books_key_search_results
      CROSS JOIN UNNEST(search_results)
      AS T(numFound, q, documentation_url, docs)
    

Key search with MongoDB

This example shows how to run a key search with MongoDB by using Flink SQL.

This example shows the following steps:

  • Create a connection resource to a MongoDB database.

  • Create a MongoDB external table.

  • Create a text input table.

  • Run a key search on movie titles.

  • Show the results and store them in a new table.

It assumes an Atlas cluster as shown in Create a Cluster and MongoDB sample dataset.

  1. Run the following command to create a connection resource named “mongodb_connection” that uses your MongoDB credentials.

    CREATE CONNECTION mongodb_connection
      WITH (
        'type' = 'mongodb',
        'endpoint' = '<atlas_endpoint>',
        'username' = '<atlas_username>',
        'password' = '<atlas_password>'
      );
    
  2. Run the following statement to create a MongoDB external table.

    -- mongodb key search
    CREATE TABLE mongodb_movies_key_search (
      title STRING,
      plot STRING
    ) WITH (
      'connector' = 'mongodb',
      'mongodb.connection' = 'mongodb_connection',
      'mongodb.database' = 'sample_mflix',
      'mongodb.collection' = 'movies',
      'mongodb.index' = 'default'
    );
    
  3. Run the following statement to create a search table.

    -- create search table
    CREATE TABLE movies_key_search(
      title STRING,
      year INT
    );
    
  4. Run the following statement to insert data into the search table.

    INSERT INTO movies_key_search VALUES
      ('golden finger', 1964),
      ('license to kill', 1989),
      ('skyfall', 2012),
      ('die hard', 1988)
    
  5. Run the following statement to run a key search on movie titles.

    -- mongodb key search
    SELECT * FROM movies_key_search,
      LATERAL TABLE(KEY_SEARCH_AGG(`mongodb_movies_key_search`, descriptor(title), title))
    
  6. Run the following statement to store search results in a new table.

    -- store search results in a new table
    CREATE TABLE movies_key_search_results AS SELECT * FROM movies_key_search,
      LATERAL TABLE(KEY_SEARCH_AGG(`mongodb_movies_key_search`, descriptor(title), title))
    
  7. Run the following statement to show the results.

    SELECT * FROM movies_key_search_results
      CROSS JOIN UNNEST(search_results)
      AS T(title, plot)