Run an AI Model with Confluent Cloud
Confluent Intelligence supports AI model inference and enables using models as resources in Flink SQL, just like tables and functions. You can use a SQL statement to create a model resource and pass it on for inference in streaming queries. The SQL interface is available in Cloud Console and the Flink SQL shell.
The CREATE MODEL statement registers an AI or ML model in your Flink environment for real-time prediction and inference. You must have the endpoint and API keys of your model before you can use it in a query.
The following examples get you started with AI model inference.
Prerequisites
Access to Confluent Cloud.
Access to a Flink compute pool.
Sufficient permissions to create models. For more information, see RBAC for model inference.
Get the endpoint and access keys for your model
Before you can use a model with Confluent Cloud for Apache Flink®, you must host the model with a supported cloud provider. To connect to your model with Flink SQL, you provide the following details:
The model endpoint
The API keys that authorize Confluent Cloud to invoke your model
The following procedures explain how to look up these details from the supported providers.
Anthropic
To connect to a Claude model that’s hosted by Anthropic, you must have the following information for your Flink SQL statements:
The model endpoint: For the Claude inference endpoints, see API Overview in the Claude API Docs. For conversational interactions, the endpoint is
https://api.anthropic.com//v1/messages.Your API key: To create or look up a Claude API key, use the Claude Console. For more information, see Getting API Keys in the Claude API Docs.
AWS Bedrock
In your web browser, navigate to the AWS Bedrock site.
Pick a Foundation Model. You may need to request access and accept a EULA, if your account hasn’t before.
At the bottom of the page, get the
modelIdof the model from the API request example.The model endpoint resembles
https://bedrock-runtime.<REGION>.amazonaws.com/model/<MODEL_ID>/invoke.Get your AWS access key (
AWS_ACCESS_KEY_ID) and secret (AWS_SECRET_ACCESS_KEY). If you’re using temporary credentials, you need an AWS session token (AWS_SESSION_TOKEN), which you can get from the AWS access portal. If you use temporary credentials, the query may not work after the credentials expire.Set the INPUT_FORMAT associated with your chosen model from the list above. If not set, Confluent Cloud for Apache Flink usually chooses the correct one automatically based on the name of the model.
AWS Sagemaker
In your web browser, navigate to the AWS Sagemaker Studio site.
Select Train a Model or Models -> Deployable Models -> Create to upload a trained model. Also, you can select Jumpstart->pick a model->Deploy to choose a public pretrained model.
Click Deployments -> Endpoints -> Create endpoint, or create it by deploying the model.
The full endpoint URL is listed on the endpoint page.
To authenticate with Sagemaker, get your AWS access key (AWS_ACCESS_KEY_ID) and secret (AWS_SECRET_ACCESS_KEY). If you’re using temporary credentials, you need an AWS session token (AWS_SESSION_TOKEN), which you can get from the AWS access portal. If you use temporary credentials, the query may not work after the credentials expire.
Sagemaker models don’t have standard input or output formats. If you did not train the model yourself, there is no easy way to determine the expected inputs and outputs without testing the model or reading its documentation.
You may also need to set the input_content_type and output_content_type parameters.
Depending on how the endpoint is deployed, you may have to set additional header parameters, like inference_component_name.
Azure OpenAI
For non-OpenAI LLM models on Azure, see the Azure ML tab.
These model endpoints are created in Azure AI Foundry.
You must sign up for AI Foundry in your subscription.
On the left sidebar, select Deployments under Management -> Create New Deployment.
The name you choose here is
YOUR_DEPLOYMENT_NAMEin the endpoint URL below.Get the API Key from the Resource in the Cognitive Services Hub.
Select the resource group account from the list. This is
YOUR_RESOURCE_NAMEin the next step.On the resource page, choose Keys and Endpoints from the left sidebar. The endpoint URL on that page is not the full endpoint. For chat models, the full URL resembles
https://<YOUR_RESOURCE_NAME>.openai.azure.com/openai/deployments/<YOUR_DEPLOYMENT_NAME>/chat/completions?api-version=2024-02-01.For non-chat URL formats, see Azure OpenAI Service REST API reference.
Azure ML
This provider supports both Azure AI Foundry generative AI models and Azure Machine Learning for predictive models.
Azure AI Foundry (Generative AI models)
Select a project or create a new one.
Navigate to Components -> Deployments.
Navigate to Create -> Pay-as-you-go. Usually, these models are charged per-use.
Navigate to Create -> Realtime Endpoint. There are more model choices, but they have ongoing costs for as long as the model is deployed.
Click the deployment name to get the API Key.
This page also lists a “Target”, which is not the full endpoint URL. The full endpoint URL resembles
https://<DEPLOYMENT_NAME>.<REGION>.inference.ai.azure.com/<MODEL_PATH>.Usually, the MODEL_PATH resembles
v1/chat/completions. You can find the whole endpoint in the “How to deploy” documentation for each model.Many of the Azure AI Foundry models use the OPENAI-CHAT input format, even if they are not OpenAI models.
Azure Machine Learning (Predictive Models)
Navigate to Train a Model or Models -> Register to upload one from disk.
Navigate to Endpoints -> Create and select a real-time endpoint.
There are also pointers from this page to do the OpenAI or Pay-as-you-go serverless AI models mentioned above.
After creating the endpoint, click through to get the REST endpoint, which is the endpoint you need for CREATE MODEL.
Generate an API key by using the Azure CLI with the following command.
az ml online-endpoint get-credentials
Fireworks AI
To connect to a model that you host with Fireworks AI, you must provide the following information in your Flink SQL statements:
The model endpoint: For the endpoints where Fireworks AI hosts its models, see the Fireworks AI documentation. For example, for the endpoint for text models, see Text Models. For the endpoint for embeddings models, see Embeddings & Reranking.
Your API key: To create an API key, use the Create API Key operation, or use the Settings page for your account in the Fireworks dashboard.
The model version: You provide a value for the
model_versionparameter in Flink SQL when you register your model. Use the value that Fireworks AI provides for thebaseModelresponse parameter when you deploy a model. To look up this value for a model that you’ve deployed, use the Get Deployment operation, or use the Fireworks dashboard.
Google AI
To connect to a model that you host with Google AI, you must provide the following information in your Flink SQL statements:
The model endpoint, which is
https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent.Your API key. You can create an API key at
https://aistudio.google.com/app/apikey.
Gemini models are also supported through the Vertex AI provider, which you might prefer due to its integrated Google Cloud billing.
OpenAI
To connect to a model that’s hosted by OpenAI, you must have the following information for your Flink SQL statements:
The model endpoint. Usually, the endpoint is
https://api.openai.com/v1/chat/completions.Your API key. You can create an API key at https://platform.openai.com/api-keys.
Vertex AI
Navigate to the Vertex AI dashboard.
In the navigation menu, click Deploy and Use -> Online prediction.
Navigate to Endpoints -> Create (Upload or Train Model).
Choose your endpoint and click Sample Request to get the endpoint information.
The endpoint resembles
https://<REGION>-aiplatform.googleapis.com/v1/projects/<PROJECT_ID>/locations/<REGION>/endpoints/ENDPOINT_ID:predict.
Vertex AI doesn’t use API Keys. Instead, you must have a service account with the aiplatform.endpoints.predict IAM permission for the model resource. Also, you must create a service account key for this service account.
Because no default IAM role contains only this specific permission, you should create a custom role to maintain least privilege. It is also possible to scope service account permissions such that they have access only to a single model resource.
Create a connection resource
A connection resource enables you to connect to model providers in a way that protects your secrets, so Flink statements can make calls to these services securely.
Run the CREATE CONNECTION statement to create a new connection resource.
The following examples show how to use connection resources in the CREATE MODEL statements that register models with Confluent Cloud for Apache Flink.
Connections must be created in the same cloud region as the corresponding models.
For details on reusable connections, see Reuse Confluent Cloud Connections With External Services and Manage Connections with External Services in Confluent Cloud.
Text inference example with Claude hosted by Anthropic
In a Confluent Cloud Console workspace or the Flink SQL shell, run the following statement to create a connection resource named
anthropic-connectionthat uses your Claude API key.CREATE CONNECTION `anthropic-connection` WITH ( 'type' = 'anthropic', 'endpoint' = 'https://api.anthropic.com/v1/messages', 'api-key' = '<your-claude-api-key>' );
Run the following statement to register the Claude model with Confluent Cloud for Apache Flink.
CREATE MODEL `anthropic-model` INPUT (`text` VARCHAR(2147483647)) OUTPUT (`output` VARCHAR(2147483647)) WITH ( 'provider' = 'anthropic', 'anthropic.connection' = 'anthropic-connection', 'anthropic.params.max_tokens' = '2048', 'task' = 'text_generation' );
Run the following statements to create the table for input text.
CREATE TABLE `anthropic-input-table` (`id` INT, `prompt` VARCHAR(1000)); INSERT INTO `anthropic-input-table` VALUES (1, 'What is the capital of France?'), (2, 'What is the capital of Germany?'), (3, 'What is the capital of Italy?')
Run the following statement to invoke the Claude model.
SELECT * from `anthropic-input-table`, LATERAL TABLE(AI_COMPLETE(`anthropic-model`, prompt));
Text embedding examples
In a Confluent Cloud Console workspace or the Flink SQL shell, run the following statement to create the table for input text.
CREATE TABLE `text_input` (
`id` STRING,
`input` STRING
);
INSERT INTO text_input VALUES
('1', 'alien'),
('2', 'goldfinger'),
('3', 'license to kill'),
('4', 'aliens');
AWS Bedrock
For simplicity, this example uses the AWS Bedrock “amazon.titan-embed-text-v1” model.
Run the following statement to create a connection resource named
bedrock_connectionthat uses your AWS credentials.CREATE CONNECTION bedrock_connection WITH ( 'type' = 'bedrock', 'endpoint' = 'https://bedrock-runtime.<REGION>.amazonaws.com/model/<MODEL_ID>/invoke', 'aws-access-key' = '<your-aws-access-key>', 'aws-secret-key' = '<your-aws-secret-key>', 'aws-session-token' = '<your-aws-session-token>' );
Run the following statement to register your model with Confluent Cloud for Apache Flink.
CREATE MODEL bedrock_embed INPUT (text STRING) OUTPUT (response ARRAY<FLOAT>) WITH ( 'bedrock.connection'='bedrock_connection', 'bedrock.input_format'='AMAZON-TITAN-EMBED', 'provider'='bedrock', 'task'='embedding' );
Run the following statement to invoke your model.
SELECT * from text_input, LATERAL TABLE(AI_EMBEDDING('bedrock_embed', input));
Azure OpenAI
In a Confluent Cloud Console workspace or the Flink SQL shell, run the following statement to create a connection resource named
azureopenai_connectionthat uses your Azure API key.CREATE CONNECTION azureopenai_connection WITH ( 'type' = 'azureopenai', 'endpoint' = 'https://<your-resource-name>.openai.azure.com/openai/deployments/<your-deployment-name>/embeddings?api-version=2024-06-01', 'api-key' = '<your-azure-api-key>' );
Run the following statement to register your model with Confluent Cloud for Apache Flink.
CREATE MODEL azure_embed INPUT (text STRING) OUTPUT (response ARRAY<FLOAT>) WITH ( 'azureopenai.connection'='azureopenai_connection', 'provider'='azureopenai', 'task'='embedding' );
Run the following statement to invoke your model.
SELECT * from text_input, LATERAL TABLE(AI_EMBEDDING('azure_embed', input));
Fireworks AI
In a Confluent Cloud Console workspace or the Flink SQL shell, run the following statement to create a connection resource named
fireworksai-connectionthat uses your Fireworks AI API key.CREATE CONNECTION `fireworksai-connection` WITH ( 'type' = 'fireworksai', 'endpoint' = 'https://api.fireworks.ai/inference/v1/embeddings', 'api-key' = '<your-fireworksai-api-key>' );`
Run the following statement to register your model with Confluent Cloud for Apache Flink.
CREATE MODEL `fireworksai-embed` INPUT (`text` VARCHAR(2147483647)) OUTPUT (`embedding` ARRAY<FLOAT>) WITH ( 'provider' = 'fireworksai', 'fireworksai.connection' = 'fireworksai-connection', 'task' = 'embedding', 'fireworksai.model_version' = '<your-fireworks-deployment-base-name>' );
Run the following statement to invoke your model.
SELECT * from `text_input`, LATERAL TABLE(AI_EMBEDDING(`fireworksai-embed`, input));
Google AI
In a Confluent Cloud Console workspace or the Flink SQL shell, run the following statement to create a connection resource named
googleai_connectionthat uses your Google Cloud API key.CREATE CONNECTION googleai_connection WITH ( 'type' = 'googleai', 'endpoint' = 'https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent', 'api-key' = '<your-gcp-api-key>' );
Run the following statement to register your model with Confluent Cloud for Apache Flink.
CREATE MODEL google_text_cli INPUT (`text` VARCHAR(2147483647)) OUTPUT (`output` VARCHAR(2147483647)) WITH ( 'googleai.connection' = 'googleai_connection', 'googleai.system_prompt' = 'translate text to chinese', 'provider' = 'googleai', 'task' = 'text_generation' );
Run the following statement to invoke your AI model.
SELECT * FROM google_input, LATERAL TABLE(AI_COMPLETE('google_text_cli', text));
Sentiment analysis example with OpenAI LLM
In a Confluent Cloud Console workspace or the Flink SQL shell, run the following statement to create the table for input text.
CREATE TABLE text_stream ( id BIGINT, text STRING ); INSERT INTO text_stream SELECT 1 id, 'The mitochondria are the powerhouse of the cell' text; INSERT INTO text_stream SELECT 2 id, 'Happy Birthday! You are great!' text; INSERT INTO text_stream SELECT 3 id, 'Today was bad day in the stock market.' text;
Run the following statement to create a connection resource named “openai_connection” that uses your OpenAI API key.
CREATE CONNECTION openai_connection WITH ( 'type' = 'openai', 'endpoint' = 'https://api.openai.com/v1/chat/completions', 'api-key' = '<your-api-key>' );
Run the following statement to create the OpenAI model with a system prompt for sentiment analysis.
CREATE MODEL sentimentmodel INPUT(text STRING) OUTPUT(sentiment STRING) COMMENT 'sentiment analysis model' WITH ( 'provider' = 'openai', 'task' = 'classification', 'openai.connection' = 'openai_connection', 'openai.model_version' = 'gpt-3.5-turbo', 'openai.system_prompt' = 'Analyze the sentiment of the text and return only POSITIVE, NEGATIVE, or NEUTRAL.' );
Run the inference statement on the table and model.
SELECT id, text, sentiment FROM text_stream, LATERAL TABLE(ML_PREDICT('sentimentmodel', text));
Create a managed AI model
Creating a managed AI model using one of the Confluent managed models is very similar to creating a remote AI model. The most significant difference is that you specify the following options in the CREATE MODEL statement:
'provider' = 'confluent',
'confluent.model' = '<model-name>'
For the supported managed AI models, see confluent.model.
Note
Managed AI models are an an Early Access Program feature in Confluent Cloud.
An Early Access feature is a component of Confluent Cloud introduced to gain feedback. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions.
Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. Early Access Program features are provided: (a) without support; (b) “AS IS”; and (c) without indemnification, warranty, or condition of any kind. No service level commitment will apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.
If you would like to participate in the Early Access Program, sign up here.
The following code example shows how to create a managed embedding.
CREATE MODEL `managed_model_embedding`
INPUT (text STRING)
OUTPUT (embedding ARRAY<FLOAT>)
WITH (
'provider' = 'confluent',
'task' = 'classification',
'confluent.model'='BAAI/bge-large-en-v1.5'
);
For more information, see Run a Managed AI Model.