Search Functions in Confluent Cloud for Apache Flink
Confluent Cloud for Apache Flink® supports read-only external tables that let you enrich streaming data with results from external systems. Search functions run a lookup against an external table and return matching rows as a lateral join. Use them to combine fast-moving streaming data with slowly changing reference data held in key-value stores, full-text indexes, or vector databases.
The following built-in search functions are available:
KEY_SEARCH_AGG: Perform exact key lookups in external databases like JDBC, REST APIs, MongoDB, and Couchbase.
TEXT_SEARCH_AGG: Execute full-text searches in external databases like MongoDB, Couchbase, and Elasticsearch.
VECTOR_SEARCH_AGG: Run semantic similarity searches using vector embeddings in databases like MongoDB, Pinecone, Elasticsearch, and Couchbase.
For the underlying concept and the supported provider catalog, see External Tables. For the join semantics, see Lookup Joins.
KEY_SEARCH_AGG
Run a key search over an external table.
- Syntax
KEY_SEARCH_AGG(<external_table>, descriptor(<input_column>), <search_column>); -- map settings are optional KEY_SEARCH_AGG(<external_table>, descriptor(<input_column>), <search_column>, map['async_enabled', [boolean], 'client_timeout', [int], 'max_parallelism', [int], 'retry_count', [int], 'retry_error_list', [string]]);
- Description
Use the
KEY_SEARCH_AGGfunction to run key searches over external databases in Confluent Cloud for Apache Flink.The KEY_SEARCH_AGG function uses a combination of serialized table properties and configuration settings to interact with external databases. It’s designed to handle the deserialization of table properties and manage the runtime environment for executing search queries.
The output of KEY_SEARCH_AGG is an array with all rows in the external table that have a matching key in the search column.
<input_column>
Search result
<input_column_key>
array[row1<column1, column2…>, row2<column1, column2…>, …]
- Configuration
External-table calls accept these optional parameters that control async behavior, parallelism, timeouts, retry policy, and debug output:
async_enabled: Calls to external tables are asynchronous and don’t block. The default istrue.client_timeout: Time, in seconds, after which the request to the external table endpoint times out. The default is 30 seconds.debug: Return a detailed stack trace in the API response. The default isfalse. Confluent Cloud for Apache Flink implements data masking for error messages to remove any secrets or customer input, but the stack trace can contain the prompt itself or some part of the response string.max_parallelism: Maximum number of parallel requests that the function can make. Use only whenasync_enabledistrue. The default is 10.retry_count: Maximum number of times Flink retries the external table request if the request to the external table fails. The default is 3.retry_error_list: Comma-separated list of error codes that trigger a retry when the external table request fails. Only errors matching entries in this list cause Flink to retry the operation.
TEXT_SEARCH_AGG
Run a text search over an external table.
- Syntax
TEXT_SEARCH_AGG(<external_table>, descriptor(<input_column>), <search_column>, <limit>); -- map settings are optional TEXT_SEARCH_AGG(<external_table>, descriptor(<input_column>), <search_column>, <limit>, map['async_enabled', [boolean], 'client_timeout', [int], 'max_parallelism', [int], 'retry_count', [int], 'retry_error_list', [string]]);
- Description
Use the TEXT_SEARCH_AGG function to run full-text searches over external databases in Confluent Cloud for Apache Flink.
The TEXT_SEARCH_AGG function uses a combination of serialized table properties and configuration settings to interact with external databases. It’s designed to handle the deserialization of table properties and manage the runtime environment for executing search queries.
The output of TEXT_SEARCH_AGG is an array with all rows in the external table that have matching text in the search column.
<input_column>
Search result
<input_column_text>
array[row1<column1, column2…>, row2<column1, column2…>, …]
- Configuration
External-table calls accept these optional parameters that control async behavior, parallelism, timeouts, retry policy, and debug output:
async_enabled: Calls to external tables are asynchronous and don’t block. The default istrue.client_timeout: Time, in seconds, after which the request to the external table endpoint times out. The default is 30 seconds.debug: Return a detailed stack trace in the API response. The default isfalse. Confluent Cloud for Apache Flink implements data masking for error messages to remove any secrets or customer input, but the stack trace can contain the prompt itself or some part of the response string.max_parallelism: Maximum number of parallel requests that the function can make. Use only whenasync_enabledistrue. The default is 10.retry_count: Maximum number of times Flink retries the external table request if the request to the external table fails. The default is 3.retry_error_list: Comma-separated list of error codes that trigger a retry when the external table request fails. Only errors matching entries in this list cause Flink to retry the operation.
VECTOR_SEARCH_AGG
Run a vector search over an external table.
- Syntax
VECTOR_SEARCH_AGG(<external_table>, descriptor(<input_column>), <embedding_column>, <limit>); -- map settings are optional VECTOR_SEARCH_AGG(<external_table>, descriptor(<input_column>), <embedding_column>, <limit>, map['async_enabled', [boolean], 'client_timeout', [int], 'max_parallelism', [int], 'retry_count', [int], 'retry_error_list', [string]]);
Note
Vector Search is an Open Preview feature in Confluent Cloud.
A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.
- Description
Use the VECTOR_SEARCH_AGG function to run semantic similarity searches over external databases in Confluent Cloud for Apache Flink. This function is commonly used together with AI_EMBEDDING to embed the input column inline.
The VECTOR_SEARCH_AGG function uses a combination of serialized table properties and configuration settings to interact with external databases. It’s designed to handle the deserialization of table properties and manage the runtime environment for executing search queries.
The output of VECTOR_SEARCH_AGG is an array with all rows in the external table that have a matching vector in the search column.
<input_column>
Search result
<input_column_vector>
array[row1<column1, column2…>, row2<column1, column2…>, …]
- Configuration
External-table calls accept these optional parameters that control async behavior, parallelism, timeouts, retry policy, and debug output:
async_enabled: Calls to external tables are asynchronous and don’t block. The default istrue.client_timeout: Time, in seconds, after which the request to the external table endpoint times out. The default is 30 seconds.debug: Return a detailed stack trace in the API response. The default isfalse. Confluent Cloud for Apache Flink implements data masking for error messages to remove any secrets or customer input, but the stack trace can contain the prompt itself or some part of the response string.max_parallelism: Maximum number of parallel requests that the function can make. Use only whenasync_enabledistrue. The default is 10.retry_count: Maximum number of times Flink retries the external table request if the request to the external table fails. The default is 3.retry_error_list: Comma-separated list of error codes that trigger a retry when the external table request fails. Only errors matching entries in this list cause Flink to retry the operation.
- Example
After you have registered the AI inference model by using the CREATE MODEL statement, you can start running vector searches. The following example assumes a vector search endpoint as shown in Elasticsearch Quick Start Guide and an API key as shown in Kibana API Keys.
Run the following statement to create a connection resource named elastic_connection that uses your Elastic credentials.
CREATE CONNECTION elastic_connection WITH ( 'type' = 'elastic', 'endpoint' = '<ELASTICSEARCH_ENDPOINT>', 'api-key' = '<ELASTIC_API_KEY>' );
For self-hosted Elasticsearch, you can use basic authentication with username and password:
CREATE CONNECTION elastic_connection WITH ( 'type' = 'elastic', 'endpoint' = '<ELASTICSEARCH_ENDPOINT>', 'username' = '<ELASTIC_USERNAME>', 'password' = '<ELASTIC_PASSWORD>' );
Run the following statements to create the tables and run the vector search.
-- Create the external table. CREATE TABLE elastic ( vector array<FLOAT>, text string ) WITH ( 'connector' = 'elastic', 'elastic.connection' = 'elastic_connection', 'elastic.index' = 'vector-search-index' ); -- Create the embedding output table. CREATE TABLE embedding_output (text string, embedding array<float>); -- Insert mock data. INSERT INTO embedding_output values ('hello world', ARRAY[1, 5, -20]); -- Run the vector search. SELECT * FROM embedding_output, LATERAL TABLE(VECTOR_SEARCH_AGG('elastic', DESCRIPTOR(embedding), embedding, 3));
For more examples, see Vector Search with Confluent Cloud for Apache Flink.