Vector Search with Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® supports read-only external tables to search over external vector databases, like MongoDB, Pinecone, Elasticsearch, and Couchbase.
Note
Vector Search is an Open Preview feature in Confluent Cloud.
A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.
Use the VECTOR_SEARCH_AGG function in conjunction with AI model inference to enable LLM-RAG use cases on Confluent Cloud.
The output of VECTOR_SEARCH_AGG is an array with all rows in the external table that have a matching vector in the search column.
Syntax¶
SELECT * FROM embedding_input,
LATERAL TABLE(VECTOR_SEARCH_AGG(<external_table>, DESCRIPTOR(<input_column>), <search_column>, <LIMIT>));
Supported providers¶
These vector search providers are supported:
- Couchbase
- Elasticsearch
- MongoDB
- Pinecone
Search table parameters¶
Specify the following properties in the CREATE TABLE statement when you create a search table.
Common parameters¶
Couchbase properties¶
couchbase.connection¶
Couchbase connection that includes the endpoint, username and password. For more information, see Manage Connections.
This property is required.
Elasticsearch properties¶
Elasticsearch does not support username and password credentials.
elastic.connection¶
Elastic connection that includes the endpoint and the api key. For more information, see Manage Connections.
This property is required.
elastic.index¶
The search index for the Elastic Search.
MongoDB properties¶
Only Atlas ANN search is supported. MongoDB filter is not supported.
mongodb.connection¶
MongoDB connection that includes endpoint, username and passwords. For more information, see Manage Connections.
This property is required.
mongodb.numcandidates¶
Number of nearest neighbors to use during the search. This value must be less than or equal to 10000. You can’t specify a number less than the number of documents to return (limit).
This property is required.
Pinecone properties¶
pinecone.connection¶
Pinecone connection that includes the endpoint and the API key. For more information, see Manage Connections.
This property is required.
Examples¶
Vector database search with Couchbase¶
This example shows how to run a vector search with Couchbase by using Flink SQL.
Import a Couchbase example dataset by following the steps in Run a Vector Search with a Couchbase SDK.
Create a new search index to include all fields in the dataset.
Run the following command to create a connection resource named “azureopenai-cli-connection” that uses your Azure API key.
confluent flink connection create azureopenai-cli-connection \ --cloud AZURE \ --region westus2 \ --type azureopenai \ --endpoint ${EMBEDDING_ENDPOINT} \ --api-key ${YOUR_AZURE_API_KEY} \ --environment ${ENV_ID}
Run the following command to create a connection resource named “couchbase-connection” that uses your couchbase credentials.
confluent flink connection create couchbase-connection \ --cloud AZURE \ --region westus2 \ --type couchbase \ --endpoint ${couchbase_endpoint} \ --username ${couchbase_username} \ --password ${couchbase_password} \ --environment ${ENV_ID}
This example shows the following steps:
- Create an input text table.
- Convert the text table to the OpenAI embedding format.
- Create a Couchbase external table.
- Run a vector search embedding on the external table.
Note that Couchbase doesn’t return the original embedding column, so the embedding column in the search result is NULL.
-- Create test tables.
CREATE TABLE text_input (input STRING);
CREATE TABLE embedding_output (name STRING, embedding ARRAY<FLOAT>);
-- Create embedding model
CREATE MODEL openaiembed
INPUT (input STRING)
OUTPUT (embedding ARRAY<FLOAT>)
WITH(
'task' = 'classification',
'provider'= 'azureopenai',
'azureopenai.input_format'='OPENAI-EMBED',
'azureopenai.connection' = 'azureopenai-cli-connection'
);
-- Create Couchbase external table
CREATE TABLE couchbase_color (
brightness DOUBLE,
description STRING,
embedding_vector_dot ARRAY<FLOAT>
) WITH (
'connector' = 'couchbase',
'couchbase.connection' = 'couchbase-connection',
'couchbase.bucket' = 'color-vector-sample',
'couchbase.scope' = 'color',
'couchbase.collection' = 'rgb',
'couchbase.index' = 'rgb-vector'
);
-- Insert data for search.
INSERT INTO embedding_output SELECT * FROM text_input,
LATERAL TABLE(ML_PREDICT('openaiembed', input));
INSERT INTO text_input VALUES
('associated with growth nature and positivity'),
('a mix of blue and green'),
('the color of the sky on a clear summer day')
-- Run the vector search.
SELECT * FROM embedding_output,
LATERAL TABLE(VECTOR_SEARCH_AGG(couchbase_color, DESCRIPTOR(embedding_vector_dot), embedding, 3));
-- Or flatten the result
CREATE TABLE couchbase_search_result AS SELECT * FROM embedding_output,
LATERAL TABLE(VECTOR_SEARCH_AGG(couchbase_color, DESCRIPTOR(embedding_vector_dot), embedding, 3));
SELECT * FROM couchbase_search_result CROSS JOIN UNNEST(search_results) AS T(brightness, description, embedding_vector_dot, score);
Vector database search with Elastic¶
This example shows how to run a vector search by using Elastic.
Create an Elasticsearch index by following this openai-KNN-RAG notebook.
Run the following command to create a connection resource named “azureopenai-cli-connection” that uses your Azure API key.
confluent flink connection create azureopenai-cli-connection \ --cloud AZURE \ --region westus2 \ --type azureopenai \ --endpoint ${EMBEDDING_ENDPOINT} \ --api-key ${YOUR_AZURE_API_KEY} \ --environment ${ENV_ID}
Run the following command to create a connection resource named “elastic-connection” that uses your Elastic credentials.
confluent flink connection create elastic-connection \ --cloud AWS \ --region us-west-2 \ --type elastic \ --endpoint ${ELASTICSEARCH_ENDPOINT} \ --api-key ${ELASTIC_API_KEY} \ --environment ${ENV_ID}
This example shows the following steps:
- Create an input vector table.
- Convert text table to OpenAI embedding format.
- Run vector search.
-- Create test tables
CREATE TABLE text_input (input STRING);
CREATE TABLE embedding_output (name STRING, embedding ARRAY<FLOAT>);
-- Create embedding model
CREATE MODEL openaiembed
INPUT (input STRING)
OUTPUT (embedding ARRAY<FLOAT>)
WITH(
'task' = 'classification',
'provider'= 'azureopenai',
'azureopenai.input_format'='OPENAI-EMBED',
'azureopenai.connection' = 'azureopenai-cli-connection'
);
-- Create external table
CREATE TABLE elastic (
vector ARRAY<FLOAT>,
text STRING
) WITH (
'connector' = 'elastic',
'elastic.connection' = 'elastic-connection',
'elastic.index' = 'wikipedia_vector_index'
);
-- Insert data
INSERT INTO text_input VALUES
('How big is the Atlantic ocean?'),
('What is the biggest continent?'),
('What is the tallest building in the world?')
INSERT INTO embedding_output SELECT * FROM text_input,
LATERAL TABLE(ML_PREDICT('openaiembed', input));
-- Vector search
SELECT * FROM embedding_output,
LATERAL TABLE(VECTOR_SEARCH_AGG(elastic, DESCRIPTOR(vector), embedding, 3));
-- Or flatten the result.
CREATE TABLE elastic_search_result AS SELECT * FROM embedding_output,
LATERAL TABLE(VECTOR_SEARCH_AGG(elastic, DESCRIPTOR(vector), embedding, 3));
SELECT * FROM elastic_search_result CROSS JOIN UNNEST(search_results) AS T(vector, text, score);
Vector database search with MongoDB¶
This example shows how to run a vector search over the MongoDB Sample Mflix Dataset. It assumes an Atlas cluster as shown in Create a Cluster, a MongoDB API key as shown in API Key Authentication, and an Azure OpenAI connection similar the connection shown in Connection resource.
Run the following command to create a connection resource named “azureopenai-cli-connection” that uses your Azure API key.
confluent flink connection create azureopenai-cli-connection \ --cloud AZURE \ --region westus2 \ --type azureopenai \ --endpoint ${EMBEDDING_ENDPOINT} \ --api-key ${YOUR_AZURE_API_KEY} \ --environment ${ENV_ID}
Run the following command to create a connection resource named “mongodb-connection” that uses your mongodb credentials. The Atlas endpoint resembles
mongodb+srv://cluster0.iwuir3o.mongodb.net
.confluent flink connection create mongodb-connection \ --cloud AZURE \ --region westus2 \ --type mongodb \ --endpoint ${atlas_endpoint} \ --username ${atlas_username} \ --password ${atlas_password} \ --environment ${ENV_ID}
This example shows the following steps:
- Create an Atlas embedding table.
- Create an input text table.
- Convert the text table to the OpenAI embedding format.
- Create a MongoDB external table.
- Run a vector search embedding on the external table.
-- Create test tables.
CREATE TABLE text_input (input STRING);
CREATE TABLE embedding_output (name STRING, embedding ARRAY<FLOAT>);
-- Create the embedding model.
CREATE MODEL openaiembed
INPUT (input STRING)
OUTPUT (embedding ARRAY<FLOAT>)
WITH(
'task' = 'classification',
'provider'= 'azureopenai',
'azureopenai.input_format'='OPENAI-EMBED',
'azureopenai.connection' = 'azureopenai-cli-connection'
);
-- Create an external table.
CREATE TABLE mongodb (
title STRING,
plot STRING,
plot_embedding ARRAY<FLOAT>
) WITH (
'connector' = 'mongodb',
'mongodb.connection' = 'mongodb-connection',
'mongodb.database' = 'sample_mflix',
'mongodb.collection' = 'movies_embeddings',
'mongodb.index' = 'idx_plot_embedding',
'mongodb.numcandidates' = '100'
);
-- Insert data for search.
INSERT INTO embedding_output SELECT * FROM text_input,
LATERAL TABLE(ML_PREDICT('openaiembed', input));
INSERT INTO text_input VALUES
('goldfinger'),
('license to kill'),
('skyfall'),
('die hard');
-- Run the vector search.
SELECT * FROM embedding_output,
LATERAL TABLE(VECTOR_SEARCH_AGG(mongodb, DESCRIPTOR(plot_embedding), embedding, 3));
-- Or flatten the result.
CREATE TABLE mongodb_result AS SELECT * FROM embedding_output,
LATERAL TABLE(VECTOR_SEARCH_AGG(mongodb, DESCRIPTOR(plot_embedding), embedding, 3));
SELECT * FROM mongodb_result CROSS JOIN UNNEST(search_results) AS T(title, plot, score);
Vector search with Pinecone¶
The following example assumes a Pinecone API key as shown in Pinecone Quick Start and an OpenAI connection as shown in Connection resource.
- Follow this Pinecone notebook to create a index of LangChain docs.
This example shows the following steps:
Run the following command to create a connection resource named “azureopenai-cli-connection” that uses your Azure API key.
confluent flink connection create azureopenai-cli-connection \ --cloud AZURE \ --region westus2 \ --type azureopenai \ --endpoint ${EMBEDDING_ENDPOINT} \ --api-key ${YOUR_AZURE_API_KEY} \ --environment ${ENV_ID}
Run the following command to create a connection resource named “pinecone-connection” that uses your Pinecone credentials.
confluent flink connection create pinecone-connection \ --cloud AZURE \ --region westus2 \ --type pinecone \ --endpoint ${pinecone_query_endpoint} \ --api-key ${pinecone_api_key} \ --environment ${ENV_ID}
Run the following statements to create the tables.
CREATE TABLE text_input (input STRING); CREATE TABLE embedding_output (question STRING, embedding ARRAY<FLOAT>); -- Create the search table. CREATE TABLE pinecone ( text STRING, embeddings ARRAY<FLOAT> ) WITH ( 'connector' = 'pinecone', 'pinecone.connection' = 'pinecone-connection', );
Run the following statements to create and run the embedding model.
-- Create the embedding model. CREATE MODEL openaiembed INPUT (input STRING) OUTPUT (embedding ARRAY<FLOAT>) WITH( 'task' = 'classification', 'provider'= 'azureopenai', 'azureopenai.input_format'='OPENAI-EMBED', 'azureopenai.connection' = 'azureopenai-cli-connection' ); -- Insert testing data. INSERT INTO embedding_output SELECT * FROM text_input, LATERAL TABLE(ML_PREDICT('openaiembed', input)); INSERT INTO text_input VALUES ('what is LangChain?'), ('how do I use the LLMChain in LangChain?'), ('what is a pipeline in LangChain?'), ('how to partially format prompt templates');
Run the following statements to execute the vector search.
-- Run the vector search. SELECT * FROM embedding_output, LATERAL TABLE(VECTOR_SEARCH_AGG(pinecone, DESCRIPTOR(embeddings), embedding, 3)); -- Or flatten the result. CREATE TABLE pinecone_result AS SELECT * FROM embedding_output, LATERAL TABLE(VECTOR_SEARCH_AGG(pinecone, DESCRIPTOR(embeddings), embedding, 3)); SELECT * FROM pinecone_result CROSS JOIN UNNEST(search_results) AS T(text, embeddings, score);