Run an AI Model with Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® supports AI model inference and enables using models as resources in Flink SQL, just like tables and functions. You can use a SQL statement to create a model resource and pass it on for inference in streaming queries. The SQL interface is available in Cloud Console and the Flink SQL shell.

The CREATE MODEL statement registers an AI or ML model in your Flink environment for real-time prediction and inference.

You need to create a model before you can use it in your queries.

The following examples get you started with AI model inference. Currently, you need the Confluent CLI to create and run AI with Flink.

Prerequisites

  • Access to Confluent Cloud.
  • Access to a Flink compute pool.
  • Sufficient permissions to create models. For more information, see RBAC for model inference.

Create an AI model

You need an AI model on one of the cloud providers. The following steps show how to start creating models.

  1. In your web browser, navigate to the AWS Bedrock site.

  2. Pick a Foundation Model. You may need to request access and accept a EULA, if your account hasn’t before.

  3. At the bottom of the page, get the modelId of the model from the API request example.

    The model endpoint should be https://bedrock-runtime.<REGION>.amazonaws.com/model/<MODEL_ID>/invoke.

  4. Bedrock doesn’t use API keys. Instead, you must get your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. If you’re using temporary credentials, you need an AWS_SESSION_TOKEN, which you can get from the AWS access portal. If you use temporary credentials, the query may not work after the credentials expire.

  5. Set the INPUT_FORMAT associated with your chosen model from the list above. If not set, Confluent Cloud for Apache Flink usually chooses the correct one automatically based on the name of the model.


Create a connection resource

Note

Connection resources are an Early Access Program feature in Confluent Cloud. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions. To participate in this Early Access Program, contact your Confluent account manager.

Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. Early Access Program features are provided: (a) without support; (b) “AS IS”; and (c) without indemnification, warranty, or condition of any kind. No service level commitment will apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.

A connection resource enables you to connect to model providers in a way that protects your secrets, so Flink statements can make calls to these services securely.

Run the confluent flink connection create command to create a new connection resource.

The following examples show how to use connection resources in the CREATE MODEL statements that register models with Confluent Cloud for Apache Flink.

  • Replace the ENV_ID environment variable with the identifier for your Confluent Cloud environment.
  • Connections must be created in same cloud region as the corresponding models.

Text embedding with AWS Bedrock and Azure OpenAI

This example uses the us-central1 region on Google Cloud, but for best performance, you should use a regional endpoint near the location where the Flink statement will run.

The input and output formats for the embedding model are detected automatically.

  • Create the table for input text.

    CREATE TABLE `text_input` (
      `id` STRING,
      `input` STRING
    );
    
    INSERT INTO text_input VALUES
      ('1', 'alien'),
      ('2', 'golden finger'),
      ('3', 'license to kill'),
      ('4', 'aliens');
    

AWS Bedrock

For simplicity, this example uses the AWS Bedrock “amazon.titan-embed-text-v1” model.

  1. Run the following command to create a connection resource named “bedrock-cli-connection” that uses your AWS credentials.

    confluent flink connection create bedrock-cli-connection \
      --cloud AWS \
      --region us-west-2 \
      --environment ${ENV_ID} \
      --type bedrock \
      --endpoint https://bedrock-runtime.us-west-2.amazonaws.com/model/amazon.titan-embed-text-v1/invoke \
      --aws-access-key $AWS_ACCESS_KEY_ID \
      --aws-secret-key $AWS_SECRET_ACCESS_KEY \
      --aws-session-token $AWS_SESSION_TOKEN
    
  2. In the Flink SQL shell, run the following statement to register your model with Confluent Cloud for Apache Flink.

    CREATE MODEL bedrock_embed
    INPUT (text STRING)
    OUTPUT (response ARRAY<FLOAT>)
    WITH (
      'bedrock.connection'='bedrock-cli-connection',
      'bedrock.input_format'='AMAZON-TITAN-EMBED',
      'provider'='bedrock',
      'task'='text_generation'
    );
    
  3. Run the following statement to invoke your AI model.

    SELECT * from text_input, LATERAL TABLE(ML_PREDICT('bedrock_embed', input));
    

Azure OpenAI

  1. Run the following command to create a connection resource named “azureopenai-cli-connection” that uses your Azure API key.

    confluent flink connection create azureopenai-cli-connection \
      --cloud AZURE \
      --region westus2 \
      --environment ${ENV_ID} \
      --type azureopenai \
      --endpoint https://matrix-central.openai.azure.com/openai/deployments/matrix-central-embed/embeddings?api-version=2024-06-01 \
      --api-key <your-azure-api-key>
    
  2. In the Flink SQL shell, run the following statement to register your model with Confluent Cloud for Apache Flink.

    CREATE MODEL azure_embed
    INPUT (text STRING)
    OUTPUT (response ARRAY<FLOAT>)
    WITH (
      'azureopenai.connection'='azureopenai-cli-connection',
      'provider'='azureopenai',
      'task'='text_generation'
    );
    
  3. Run the following statement to invoke your AI model.

    SELECT * from text_input, LATERAL TABLE(ML_PREDICT('azure_embed', input));
    

Google AI

  1. Run the following command to create a connection resource named “googleai-cli-connection” that uses your Google Cloud API key.

    confluent flink connection create googleai-cli-connection \
    --cloud GCP \
    --region us-central1 \
    --environment ${ENV_ID} \
    --type googleai \
    --endpoint https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent \
    --api-key <your-gcp-api-key>
    
  2. In the Flink SQL shell, run the following statement to register your model with Confluent Cloud for Apache Flink.

    CREATE MODEL google_text_cli
    INPUT (`text` VARCHAR(2147483647))
    OUTPUT (`output` VARCHAR(2147483647))
    WITH (
      'googleai.connection' = 'googleai-cli-connection',
      'googleai.system_prompt' = 'translate text to chinese',
      'provider' = 'googleai',
      'task' = 'text_generation'
    );
    
  3. Run the following statement to invoke your AI model.

    SELECT * FROM google_input, LATERAL TABLE(ML_PREDICT('google_text_cli', text));
    

Sentiment analysis with OpenAI LLM

  1. Create the table for input text.

    CREATE TABLE text_stream (
      id BIGINT, text STRING
    );
    
    INSERT INTO text_stream SELECT 1 id, 'The mitochondria are the powerhouse of the cell' text;
    INSERT INTO text_stream SELECT 2 id, 'Happy Birthday! You are great!' text;
    INSERT INTO text_stream SELECT 3 id, 'Today was bad day in the stock market.' text;
    
  2. Run the following Confluent CLI command to create a connection with the API key to access your OpenAI account resources.

    confluent flink connection create openai-connection \
    --cloud GCP \
    --region us-central1 \
    --environment ${ENV_ID} \
    --type openai \
    --endpoint https://api.openai.com/v1/chat/completions \
    --api-key <your-api-key>
    
  3. Run the following code to create the OpenAI model with a system prompt for sentiment analysis.

    CREATE MODEL sentimentmodel
    INPUT(text STRING)
    OUTPUT(sentiment STRING)
    COMMENT 'sentiment analysis model'
    WITH (
      'provider' = 'openai',
      'task' = 'classification',
      'openai.connection' = 'openai-connection',
      'openai.model_version' = 'gpt-3.5-turbo',
      'openai.system_prompt' = 'Analyze the sentiment of the text and return only POSITIVE, NEGATIVE, or NEUTRAL.'
    );
    
  4. Run the inference statement on the table and model.

    SELECT id, text, sentiment FROM text_stream, LATERAL TABLE(ML_PREDICT('sentimentmodel', text));