Run an AI Model with Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink®️ supports AI model inference and enables using models as resources in Flink SQL, just like tables and functions. You can use a SQL statement to create a model resource and pass it on for inference in streaming queries. The SQL interface is available in Cloud Console and the Flink SQL shell.
The CREATE MODEL statement registers an AI or ML model in your Flink environment for real-time prediction and inference.
You need to create a model before you can use it in your queries.
The following examples get you started with AI model inference. Currently, you need the Confluent CLI to create and run AI with Flink.
Create an AI model¶
You need an AI model on one of the cloud providers. The following steps show how to start creating models.
In your web browser, navigate to the AWS Bedrock site.
Pick a Foundation Model. You may need to request access and accept a EULA, if your account hasn’t before.
At the bottom of the page, get the
modelId
of the model from the API request example.The model endpoint should be
https://bedrock-runtime.<REGION>.amazonaws.com/model/<MODEL_ID>/invoke
.Bedrock doesn’t use API keys. Instead, you must get your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. If you’re using temporary credentials, you need an AWS_SESSION_TOKEN, which you can get from the AWS access portal. If you use temporary credentials, the query may not work after the credentials expire.
Set the INPUT_FORMAT associated with your chosen model from the list above. If not set, Confluent Cloud for Apache Flink usually chooses the correct one automatically based on the name of the model.
In your web browser, navigate to the AWS Sagemaker Studio site.
Select Train a Model or Models->Deployable Models->Create to upload a trained model. Also, you can select Jumpstart->pick a model->Deploy to choose a public pretrained model.
Click Deployments -> Endpoints -> Create endpoint, or create it by deploying the model.
The full endpoint URL is listed on the endpoint page.
Sagemaker doesn’t use API keys. Instead, you must get your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. If you’re using temporary credentials, you need an AWS_SESSION_TOKEN, which you can get from the AWS access portal. If you use temporary credentials, the query may not work after the credentials expire.
Sagemaker models don’t have standard input/output formats. If you did not train the model yourself, there is no easy way to determine the expected inputs and outputs without testing the model or reading its documentation.
You may also need to set the input_content_type and output_content_type parameters.
Depending on how the endpoint is deployed, you may have to set additional header parameters, like inference_component_name.
For non-OpenAI LLM models on Azure, see the “Azure ML” tab.
These model endpoints are created in Azure’s OpenAI Studio (https://oai.azure.com/portal)
Select “Deployments” under “Management” on the left sidebar -> “Create New Deployment” The name you choose here will be YOUR_DEPLOYMENT_NAME in the endpoint url below. (Not included in these instructions: Signing up for all this in your Subscription.)
After creating the deployment, to get the API Key you have to go find the Resource (which is NOT as of 2024-04-19 in OpenAI Studio). Go to https://portal.azure.com/#view/Microsoft_Azure_ProjectOxford/CognitiveServicesHub/~/OpenAI
Select the resource group account from the list. (This is YOUR_RESOURCE_NAME below.)
On the resource page, choose Keys and Endpoints from the left sidebar. The endpoint URL on that page is not the full endpoint. For chat models, the full URL resembles
https://<YOUR_RESOURCE_NAME>.openai.azure.com/openai/deployments/<YOUR_DEPLOYMENT_NAME>/chat/completions?api-version=2024-02-01
.For non-chat URL formats, see Azure OpenAI Service REST API reference.
This provider supports both Azure AI Studio generative AI models and Azure Machine Learning for predictive models.
Azure AI Studio (Generative AI models)
Select a project or create a new one.
Navigate to Components->Deployments.
Navigate to Create->Pay-as-you-go. Usually, these models are charged per-use.
Navigate to Create->Realtime Endpoint. There are more model choices, but htye have ongoing cost for as long as the model is deployed.
Click the deployment name to get the API Key.
This page also lists a “Target”, which is not the full endpoint URL. The full endpoint URL resembles
https://<DEPLOYMENT_NAME>.<REGION>.inference.ai.azure.com/<MODEL_PATH>
.Usually, the MODEL_PATH resembles
v1/chat/completions
. You can find the whole endpoint in the “How to deploy” documentation for each model.Many of the Azure AI Studio models use the OPENAI-CHAT input format, even if they are not OpenAI models.
Azure Machine Learning (Predictive Models)
Navigate to Train a Model or Models->Register to upload one from disk.
Navigate to Endpoints->Create and select a real-time endpoint.
There are also pointers from this page to do the OpenAI or Pay-as-you-go serverless AI models mentioned above.
After creating the endpoint, click through to get the REST endpoint, which is the endpoint you need for CREATE MODEL.
Generate an API key by using the Azure CLI with the following command.
az ml online-endpoint get-credentials
Generate an API Key from https://aistudio.google.com/app/apikey
.
The endpoint is https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:generateContent
.
Gemini models are also supported through the Vertex AI provider, which you may prefer due to integrated Google Cloud billing.
Generate your API Key from https://platform.openai.com/api-keys.
Usually, the endpoint is
https://api.openai.com/v1/chat/completions
.Prefer using Azure OpenAI, because billing and signup are integrated into Azure.
Navigate to the Vertex AI dashboard.
In the navigation menu, click Deploy and Use -> Online prediction.
Navigate to Endpoints -> Create (Upload or Train Model).
Choose your endpoint and click Sample Request to get the endpoint information.
The endpoint resembles:
``https://<REGION>-aiplatform.googleapis.com/v1/projects/<PROJECT_ID>/locations/<REGION>/endpoints/ENDPOINT_ID:predict``.
Vertex AI doesn’t use API Keys. Instead, you must have a service account with the “aiplatform.endpoints.predict” IAM permission for the model resource. Also, you must create a service account key for this service account.
None of the default IAM roles is limited to only this permission, so it is advisable to create one. It is also possible to scope service account permissions such that they have access only to a single model resource.
Text embedding with AWS Bedrock and Azure OpenAI¶
This example uses the us-central1
region on Google Cloud, but for best performance,
you should use a regional endpoint near the location where the Flink statement
will run.
The input and output formats for the embedding model are detected automatically.
Create the table for input text.
CREATE TABLE `text_input` ( `id` STRING, `input` STRING ); INSERT INTO text_input VALUES ('1', 'alien'), ('2', 'golden finger'), ('3', 'license to kill'), ('4', 'aliens');
AWS Bedrock¶
For simplicity, this example uses the AWS Bedrock “amazon.titan-embed-text-v1” model.
Run the following command to create a connection resource named “bedrock-cli-connection” that uses your AWS credentials.
confluent flink connection create bedrock-cli-connection \ --cloud AWS \ --region us-west-2 \ --type bedrock \ --endpoint https://bedrock-runtime.us-west-2.amazonaws.com/model/amazon.titan-embed-text-v1/invoke \ --aws-access-key $AWS_ACCESS_KEY_ID \ --aws-secret-key $AWS_SECRET_ACCESS_KEY \ --aws-session-token $AWS_SESSION_TOKEN
In the Flink SQL shell, run the following statement to register your model with Confluent Cloud for Apache Flink.
CREATE MODEL bedrock_embed INPUT (text STRING) OUTPUT (response ARRAY<FLOAT>) WITH ( 'bedrock.connection'='bedrock-cli-connection', 'bedrock.input_format'='AMAZON-TITAN-EMBED', 'provider'='bedrock', 'task'='text_generation' );
Run the following statement to invoke your AI model.
SELECT * from text_input, LATERAL TABLE(ML_PREDICT('bedrock_embed', input));
Azure OpenAI¶
Run the following command to create a connection resource named “azureopenai-cli-connection” that uses your Azure API key.
confluent flink connection create azureopenai-cli-connection \ --cloud AZURE \ --region westus2 \ --type azureopenai \ --endpoint https://matrix-central.openai.azure.com/openai/deployments/matrix-central-embed/embeddings?api-version=2024-06-01 \ --api-key <your-azure-api-key>
In the Flink SQL shell, run the following statement to register your model with Confluent Cloud for Apache Flink.
CREATE MODEL azure_embed INPUT (text STRING) OUTPUT (response ARRAY<FLOAT>) WITH ( 'azureopenai.connection'='azureopenai-cli-connection', 'provider'='azureopenai', 'task'='text_generation' );
Run the following statement to invoke your AI model.
SELECT * from text_input, LATERAL TABLE(ML_PREDICT('azure_embed', input));
Google AI¶
Run the following command to create a connection resource named “googleai-cli-connection” that uses your Google Cloud API key.
confluent flink connection create googleai-cli-connection \ --cloud GCP \ --region us-central1 \ --type googleai \ --endpoint https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent \ --api-key <your-gcp-api-key>
In the Flink SQL shell, run the following statement to register your model with Confluent Cloud for Apache Flink.
CREATE MODEL google_text_cli INPUT (`text` VARCHAR(2147483647)) OUTPUT (`output` VARCHAR(2147483647)) WITH ( 'googleai.connection' = 'googleai-cli-connection', 'googleai.system_prompt' = 'translate text to chinese', 'provider' = 'googleai', 'task' = 'text_generation' );
Run the following statement to invoke your AI model.
SELECT * FROM google_input, LATERAL TABLE(ML_PREDICT('google_text_cli', text));
Sentiment analysis with OpenAI LLM¶
Create the table for input text.
CREATE TABLE text_stream ( id BIGINT, text STRING ); INSERT INTO text_stream SELECT 1 id, 'The mitochondria are the powerhouse of the cell' text; INSERT INTO text_stream SELECT 2 id, 'Happy Birthday! You are great!' text; INSERT INTO text_stream SELECT 3 id, 'Today was bad day in the stock market.' text;
Run the following Confluent CLI command to create a connection with the API key to access your OpenAI account resources.
confluent flink connection create openai-connection \ --cloud GCP \ --region us-central1 \ --type openai \ --endpoint https://api.openai.com/v1/chat/completions \ --api-key <your-api-key>
Run the following code to create the OpenAI model with a system prompt for sentiment analysis.
CREATE MODEL sentimentmodel INPUT(text STRING) OUTPUT(sentiment STRING) COMMENT 'sentiment analysis model' WITH ( 'provider' = 'openai', 'task' = 'classification', 'openai.connection' = 'openai-connection', 'openai.model_version' = 'gpt-3.5-turbo', 'openai.system_prompt' = 'Analyze the sentiment of the text and return only POSITIVE, NEGATIVE, or NEUTRAL.' );
Run the inference statement on the table and model.
SELECT id, text, sentiment FROM text_stream, LATERAL TABLE(ML_PREDICT('sentimentmodel', text));