AI Model Inference and Machine Learning Functions in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® provides built-in functions for invoking remote AI/ML models in Flink SQL queries. These functions simplify developing and deploying AI by providing a unified platform for data processing and AI/ML tasks.

For enriching streams with lookups against key-value stores, full-text indexes, or vector databases, see Search Functions. For machine-language preprocessing utilities, see ML Preprocessing Functions.

ML_PREDICT

Run a remote AI/ML model for tasks like predicting outcomes, generating text, and classification.

Syntax
ML_PREDICT(`model_name[$version_id]`, column);

-- map settings are optional
ML_PREDICT(`model_name[$version_id]`, column, map['async_enabled', [boolean], 'client_timeout', [int], 'max_parallelism', [int], 'retry_count', [int]]);
Description

The ML_PREDICT function performs predictions using pre-trained machine learning models.

The first argument to the ML_PREDICT table function is the model name. The other arguments are the columns used for prediction. They are defined in the model resource INPUT for AI models and can vary in length or type.

Before using ML_PREDICT, you must register the model by using the CREATE MODEL statement.

For more information, see Run an AI Model.

Configuration

You can control how calls to the remote model execute with these optional parameters.

  • async_enabled: Calls to remote models are asynchronous and don’t block. The default is true.

  • client_timeout: Time, in seconds, after which the request to the model endpoint times out. The default is 30 seconds.

  • debug: Return a detailed stack trace in the API response. The default is false. Confluent Cloud for Apache Flink implements data masking for error messages to remove any secrets or customer input, but the stack trace can contain the prompt itself or some part of the response string.

  • retry_count: Maximum number of times the remote model request is retried if the request to the model fails. The default is 3.

  • max_parallelism: Maximum number of parallel requests that the function can make. Can be used only when async_enabled is true. The default is 10.

Example

After you have registered the AI model by using the CREATE MODEL statement, run the model by using the ML_PREDICT function in a SQL query.

The following example runs a model named embeddingmodel on the data in a table named text_stream.

SELECT id, text, embedding FROM text_stream, LATERAL TABLE(ML_PREDICT('embeddingmodel', text));

The following examples call the ML_PREDICT function with different configurations.

-- Specify the timeout.
SELECT * FROM `db1`.`tb1`, LATERAL TABLE(ML_PREDICT('md1', key, map['client_timeout', 60 ]));

-- Specify all configuration parameters.
SELECT * FROM `db1`.`tb1`, LATERAL TABLE(ML_PREDICT('md1', key, map['async_enabled', true, 'client_timeout', 60, 'max_parallelism', 20, 'retry_count', 5]));

ML_DETECT_ANOMALIES

Identify outliers in a data stream.

Syntax
ML_DETECT_ANOMALIES(
 data_column,
 timestamp_column,
 JSON_OBJECT('p' VALUE 1, 'q' VALUE 1, 'd' VALUE 1, 'minTrainingSize' VALUE 10));
Description

The ML_DETECT_ANOMALIES function uses an ARIMA model to identify outliers in time-series data.

Your data must include:

  • A timestamp column.

  • A target column representing some quantity of interest at each timestamp.

For more information, see Detect Anomalies in Data.

Parameters

For anomaly detection parameters, see ARIMA model parameters.

Example
SELECT
    ML_DETECT_ANOMALIES(
     total_orderunits,
     summed_ts,
     JSON_OBJECT('p' VALUE 1, 'q' VALUE 1, 'd' VALUE 1, 'minTrainingSize' VALUE 10))
    OVER (
        ORDER BY summed_ts
        RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS anomalies
FROM test_table;

ML_DETECT_ANOMALIES_ROBUST

Identify outliers in a data stream by using univariate or multivariate detection.

Syntax
ML_DETECT_ANOMALIES_ROBUST(
  value_or_vector,
  event_time,
  JSON_OBJECT(
    'window'            VALUE 60,
    'threshold'         VALUE 3.0,
    'imputeOutliers'    VALUE TRUE,
    'majorityRule'      VALUE TRUE,
    'majorityThreshold' VALUE 0.6,
    'expertWindows'     VALUE '[30,60,120]'
  )
)

 -- Univariate detection
 ML_DETECT_ANOMALIES_ROBUST(latency_ms, ts, config_json)

 -- Multivariate detection
 ML_DETECT_ANOMALIES_ROBUST(ROW(cpu, heap_used, gc_pause_ms), ts, config_json)
Description

The ML_DETECT_ANOMALIES_ROBUST function is a built-in machine learning function that detects anomalies in one or more metrics over time by using robust statistics instead of forecasting models.

Your input data must include:

  • A timestamp column, for ordering the time series.

  • A single numeric column, for univariate detection, or

  • A vector of numeric columns, passed as a ROW(...) expression, for multivariate detection.

If your stream has multiple records with the same timestamp or irregular spacing, consider first aggregating into regular intervals with TUMBLE and then applying ML_DETECT_ANOMALIES_ROBUST on the windowed results.

For more information, see Detect Anomalies in Data with MAD.

Configuration
  • value_or_vector (DOUBLE or ROW): The latency of the data stream.

  • event_time (TIMESTAMP): The timestamp of the data stream.

  • config_json (JSON): A JSON object that has the following fields: - window (INT): The window size in seconds. - threshold (DOUBLE): The threshold for the anomaly detection. - imputeOutliers (BOOLEAN): Whether to impute outliers.

Expert mode

For advanced scenarios, ML_DETECT_ANOMALIES_ROBUST supports an expert mode, in which a panel of MAD detectors with different window sizes vote on each point, and you decide how many must agree before an anomaly is raised.

In expert mode, the primary expert still uses window, and all experts share the same threshold. For multivariate inputs, previously flagged anomalies are excluded automatically from future context for each expert, to keep the panel robust.

Expert mode is controlled by the following parameters.

  • majorityRule (BOOLEAN): Whether to use the majority rule in expert mode. When TRUE, enables expert-panel / majority-vote mode. The default is FALSE.

    • When FALSE: a single MAD detector with window window drives all anomaly decisions.

    • When TRUE: a panel of experts is built, each with its own window size.

  • majorityThreshold (DOUBLE): The threshold for the majority rule in expert mode. The supported range of values is 0.0 to 1.0. The default is 0.5.

    The majorityThreshold parameter defines the fraction of ready experts that must vote “anomaly” for the point to be flagged.

    If ready is the number of experts that have enough history to evaluate the current point, and votes is the number of those experts that say it is an anomaly, the point is flagged if votes / ready > majorityThreshold. With the default value of 0.5, strictly more than half of ready experts must agree.

  • expertWindows (ARRAY<INT>): Sizes, in number of observations, for each expert’s sliding window. The default, when majorityRule is TRUE, is a panel derived from the window parameter, roughly:

    window / 4, window / 2, window, 2 * window, 5 * window.

    All values are clamped to a reasonable range.

    The type is either:

    • A JSON array, for example, JSON_ARRAY(20, 40, 80)

    • A string representation, like “[20,40,80]” or “20,40,80”

    Include both short windows, for fast response, and long windows, for stability.

    Keep values positive and reasonably small compared to your history size.

Output

The ML_DETECT_ANOMALIES_ROBUST function returns a single ROW field named anomaly, with the following schema, which matches the schema of the ML_DETECT_ANOMALIES function.

  • timestamp (TIMESTAMP): The event time of the evaluated point.

  • actual_value (DOUBLE): The original metric value.

    • Scalar mode: the original metric value.

    • Vector mode: NULL, because the input is multidimensional.

  • forecast_value (DOUBLE): The robust center (median) of the metric over the current window.

    • Scalar mode: the robust center (median) of the metric over the current window.

    • Vector mode: the distance of the current vector from the robust center vector.

  • lower_bound (DOUBLE): The lower bound on allowed distance.

    • Scalar mode: computed with the formula forecast_value - threshold * robust_sigma.

    • Vector mode: 0.0, representing the lower bound on allowed distance.

  • upper_bound (DOUBLE): The distance threshold; values above this indicate an anomaly.

    • Scalar mode: computed with the formula forecast_value + threshold * robust_sigma.

    • Vector mode: the distance threshold. Values above this indicate an anomaly.

  • is_anomaly (BOOLEAN): TRUE if the current point exceeds the robust threshold; otherwise FALSE. During warm-up, this field is NULL.

  • rmse (DOUBLE): The RMSE of the metric.

  • aic (DOUBLE): The AIC of the metric.

In multivariate mode, the anomaly score you typically examine is:

  • anomaly.forecast_value for robust distance

  • anomaly.upper_bound for distance threshold

  • anomaly.is_anomaly for final decision

Example
SELECT
    ML_DETECT_ANOMALIES_ROBUST(latency_ms, ts, config_json)
    OVER (
        ORDER BY ts
        RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS anomaly
FROM test_table;

ML_EVALUATE

Aggregate a table and return model evaluation metrics.

Syntax
ML_EVALUATE(`model_name`, label, col1, col2, ...) FROM 'eval_data_table';

Description

The ML_EVALUATE function is a table aggregation function that takes an entire table and returns a single row of model evaluation metrics. If run on all versions of a model, the function returns one row for each model version. After comparing the metrics for different versions, you can update the default version for deployment with the model that has the best evaluation metrics.

Internally, the ML_EVALUATE function runs ML_PREDICT and processes the results.

Before using ML_EVALUATE, you must register the model by using the CREATE MODEL statement.

The first argument to the ML_EVALUATE table function is the model name. The second argument is the true label that the output of the model should be evaluated against. Its type depends on the model OUTPUT type and the model task. The other arguments are the columns used for prediction. They are defined in the model resource INPUT for AI models and can vary in length or type.

The return type of the ML_EVALUATE function is Map<String, Double> for all types of tasks. Each task type has different metrics keys in the map, depending on the task type.

Metrics

The metric columns returned by ML_EVALUATE depend on the task type of the specified model.

Classification

Classification models choose a group to place their inputs in and return one of N possible values. A classification model that returns only 2 possible values is called a binary classifier. If it returns more than 2 values, it is referred to as multi-class.

Classification models return these metrics:

  • Accuracy: Total Fraction of correct predictions across all classes.

  • F1 Score: Harmonic mean of precision and recall.

  • Precision: (Class X Correctly Predicted) / (# of Class X Predicted)

  • Recall: (Class X Correctly Predicted) / (# of actual Class X)

Clustering

Clustering models group the model examples into K groups. Metrics are a measure of how compact the clusters are.

Clustering models return these metrics:

  • Davies Bouldin Index: A measure of how separated clusters are and how compact they are.

  • Intra-Cluster Variance (Mean Squared Distance): Average Squared distance of each training point to the centroid of the cluster it was assigned to.

  • Silhouette Score: Compares how similar each point is to its own cluster with how dissimilar it is to other clusters.

Embedding

Embedding models return these metrics:

Regression

Regression models predict a continuous output variable based on one or more input features.

Regression models return these metrics:

  • Mean Absolute Error: The average of the absolute differences between the predicted and actual values.

  • Mean Squared Error: The average of the squared differences between the predicted and actual values.

Text generation

Text generation models generate text based on a prompt. Text generation models return these metrics:

Example metrics

The following table shows example metrics for different task types.

Task type

Example metrics

Classification

{Accuracy=0.9999991465990892, Precision=0.9996998081063332, Recall=0.0013025368892873059, F1=0.0013025368892873059}

Clustering

{Mean Davies-Bouldin Index=0.9999991465990892}

Embedding

{Mean Cosine Similarity=0.9999991465990892, Mean Jaccard Similarity=0.9996998081063332, Mean Euclidean Distance=0.0013025368892873059}

Regression

{MAE=0.9999991465990892, MSE=0.9996998081063332, RMSE=0.0013025368892873059, MAPE=0.0013025368892873059, R²=0.0043025368892873059}

Text generation

{Mean BLEU=0.9999991465990892, Mean ROUGE=0.9996998081063332, Mean Semantic Similarity=0.0013025368892873059}

Example

After you have registered the AI model by using the CREATE MODEL statement, run the model by using the ML_EVALUATE function in a SQL query.

The following example statement registers a remote OpenAI model for a classification task.

CREATE MODEL `my_remote_model`
INPUT (f1 INT, f2 STRING)
OUTPUT (output_label STRING)
WITH(
  'task' = 'classification',
  'type' = 'remote',
  'provider' = 'openai',
  'openai.endpoint' = 'https://api.openai.com/v1/llm/v1/chat',
  'openai.api_key' = '<api-key>'
);

The following statements show how to run the ML_EVALUATE function on various versions of my_remote_model using data in a table named eval_data.

-- Model evaluation with all versions
SELECT ML_EVALUATE(`my_remote_model$all`, label, f1, f2) FROM `eval_data`;

-- Model evaluation with default version
SELECT ML_EVALUATE(`my_remote_model`, label, f1, f2) FROM `eval_data`;

-- Model evaluation with specific version 2
SELECT ML_EVALUATE(`my_remote_model$2`, label, f1, f2) FROM `eval_data`;

ML_FORECAST

Perform continuous forecasting on a table.

Syntax
ML_FORECAST(
 data_column,
 timestamp_column,
 JSON_OBJECT('p' VALUE 1, 'q' VALUE 1, 'd' VALUE 1, 'minTrainingSize' VALUE 10));
Description

The ML_FORECAST function uses an ARIMA model to perform time-series forecasting.

Your data must include:

  • A timestamp column.

  • A target column representing some quantity of interest at each timestamp.

For more information, see Forecast Data Trends.

Parameters

For forecasting parameters, see ARIMA model parameters.

Example
SELECT
    ML_FORECAST(
     total_orderunits,
     summed_ts,
     JSON_OBJECT('p' VALUE 1, 'q' VALUE 1, 'd' VALUE 1, 'minTrainingSize' VALUE 10))
    OVER (
        ORDER BY summed_ts
        RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS forecast
FROM test_table;

AI_COMPLETE

Invoke a large language model (LLM) to generate text completions, summaries, or answers.

Syntax
AI_COMPLETE(model_name, input_prompt [, invocation_config]);
Description

The AI_COMPLETE function provides a streamlined approach for generating text, taking a single string as input and returning a single string as output. This functionality enables you to leverage LLMs to produce text based on any given prompt.

Configuration
  • model_name: Name of the model entity to call to for prediction [STRING].

  • input_prompt: Input prompt to pass to the LLM for prediction [STRING].

  • invocation_config[optional]: Map to pass the configuration to manage function behavior, for example, MAP['debug', true].

Example

The following example shows how to invoke an LLM to generate text completions.

# Create an OpenAI connection.
CREATE CONNECTION openai_connection
  WITH (
    'type' = 'openai',
    'endpoint' = 'https://api.openai.com/v1/chat/completions',
    'api-key' = '<api-key>'
  );

CREATE MODEL description_extractor
  INPUT (input STRING)
  OUTPUT (output_json STRING)
WITH(
    'provider' = 'openai',
    'openai.connection' = 'openai_connection',
    'openai.system_prompt' = 'Extract json from input free text',
    'task' = 'text_generation'
  );

CREATE TABLE claims_with_structured_description(id INT, customer_id INT, output_json STRING);

INSERT INTO claims_with_structured_description
  SELECT id, customer_id, output_json FROM claims_submitted, LATERAL TABLE(AI_COMPLETE('description_extractor', description));

AI_DETECT_PII

Detect and redact personally identifiable information (PII) in text.

Syntax
AI_DETECT_PII(
  text,
  JSON_OBJECT(
    'mode' VALUE 'redact',
    'redaction_strategy' VALUE 'mask'
  )
)
Description

You can use the AI_DETECT_PII function to scan text for personally identifiable information. The function uses Piiranha by default, a model that is managed by Confluent and hosted in Confluent Cloud to identify PII in your data. AI_DETECT_PII supports detection mode (default), which returns structured results with PII types and locations, and redaction mode, which replaces detected PII with redacted values.

For more information, see Detect PII in Text.

Parameters
  • text (STRING): The text to scan for personally identifiable information. The text must not be null.

  • config (STRING): Optional JSON object with configuration parameters:

    • mode (STRING, default: 'detect'): The processing mode. Set to 'detect' or 'redact'.

    • model (STRING, default: 'piiranha-v1-pii'): The model to use for PII detection.

    • pii_types (ARRAY<STRING>, default: all types): The PII types to detect.

    • redaction_strategy (STRING, default: 'mask'): The redaction strategy. Set to 'mask', 'hash', or 'remove'.

    • score_threshold (DOUBLE, default: 0.0): The minimum confidence score for detections. Must be between 0.0 and 1.0.

Output

The AI_DETECT_PII function returns a ROW data type with the following fields:

  • has_pii (BOOLEAN): TRUE if the text contains detected PII.

  • entities (ARRAY<ROW<entity_type STRING, text STRING, start INT, end INT, score DOUBLE>>): An array of detected PII entities.

  • redacted_text (STRING): The input text with detected PII replaced. Null in detection mode.

  • metadata (STRING): A JSON string containing model metadata.

Example
SELECT
    message_id,
    message,
    AI_DETECT_PII(message) AS pii_result
FROM customer_messages;

AI_DETECT_ANOMALIES

Detect anomalies in time-series data using a foundation model.

Syntax
AI_DETECT_ANOMALIES(
  value,
  timestamp,
  JSON_OBJECT(
    'model' VALUE 'timesfm-2.5',
    'minContextSize' VALUE 10,
    'maxContextSize' VALUE 200,
    'confidencePercentage' VALUE 80.0
  )
) OVER (
  ORDER BY timestamp
  RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
)
Description

The AI_DETECT_ANOMALIES function monitors a stream of time-series data and flags data points that fall outside expected bounds. The function uses TimesFM, a foundation model from Google Research that is managed by Confluent and hosted in Confluent Cloud, to learn patterns from historical data and produce prediction intervals.

Your data must include:

  • A timestamp column for ordering the time series.

  • A numeric value column representing the time-series data to monitor.

For more information, see Detect Anomalies in Data.

Parameters
  • config (STRING): Optional JSON object with configuration parameters:

    • confidencePercentage (DOUBLE, default: 99.0): The symmetric confidence level for the prediction interval.

    • lowerBoundConfidencePercentage (DOUBLE, optional): The asymmetric lower bound confidence.

    • maxContextSize (INTEGER, default: 512): The maximum number of historical points used for prediction.

    • minContextSize (INTEGER, default: 20): The minimum number of data points before anomaly detection begins.

    • model (STRING, default: 'timesfm-2.5'): The model to use for anomaly detection.

    • rmseWindowSize (INTEGER, default: 5): The rolling window size for tracking RMSE.

    • upperBoundConfidencePercentage (DOUBLE, optional): The asymmetric upper bound confidence.

  • timestamp (TIMESTAMP(3)): The event time associated with the value.

  • value (DOUBLE): The numeric time-series value to monitor.

Output

The AI_DETECT_ANOMALIES function returns a ROW data type with the following fields:

  • timestamp (TIMESTAMP(3)): The timestamp of the evaluated data point.

  • actual_value (DOUBLE): The observed value at this timestamp.

  • forecast_value (DOUBLE): The model’s predicted value.

  • lower_bound (DOUBLE): The lower bound of the prediction interval.

  • upper_bound (DOUBLE): The upper bound of the prediction interval.

  • is_anomaly (BOOLEAN): TRUE if the value falls outside the bounds.

  • rmse (DOUBLE): The rolling RMSE of prediction errors.

  • aic (DOUBLE): Reserved for future use. Always null.

Example
SELECT
    event_time,
    response_ms,
    AI_DETECT_ANOMALIES(
        response_ms,
        event_time,
        JSON_OBJECT(
            'model' VALUE 'timesfm-2.5',
            'minContextSize' VALUE 10,
            'maxContextSize' VALUE 200,
            'confidencePercentage' VALUE 80.0
        )
    ) OVER (
        ORDER BY event_time
        RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS anomaly_result
FROM server_metrics;

AI_EMBEDDING

Generate vector embeddings for text or other data using a registered embedding model.

AI_EMBEDDING(model_name, input_text [, invocation_config]);
Description

The AI_EMBEDDING function provides a straightforward interface, accepting a single string input and returning an array of floats as the embedding response. This functionality enables you to leverage large language models (LLMs) to generate embeddings for text efficiently.

Configuration
  • model_name: Name of the model entity to call to for embeddings [STRING].

  • input_text: Input text to pass to the LLM for embeddings [STRING].

  • invocation_config[optional]: Map to pass the configuration to manage function behavior, for example, MAP['debug', true].

Example

The following example shows how to generate vector embeddings for text or other data using a registered embedding model.

# Create an OpenAI connection.
CREATE CONNECTION openai_embedding_connection
  WITH (
    'type' = 'openai',
    'endpoint' = 'https://api.openai.com/v1/embeddings',
    'api-key' = '<api-key>'
  );

  CREATE MODEL description_embedding
  INPUT (input STRING)
  OUTPUT (embeddings ARRAY<FLOAT>)
  WITH(
    'provider' = 'openai',
    'openai.connection' = 'openai_embedding_connection',
    'task' = 'embedding'
  );

  CREATE TABLE claims_embeddings(id INT, customer_id INT, embeddings ARRAY<FLOAT>);

  INSERT INTO claims_embeddings
    SELECT id, customer_id, embeddings FROM claims_submitted, LATERAL TABLE(AI_EMBEDDING('description_embedding', description));

AI_FORECAST

Forecast future values in time-series data using a foundation model.

Syntax
AI_FORECAST(
  value,
  timestamp,
  JSON_OBJECT(
    'model' VALUE 'timesfm-2.5',
    'minContextSize' VALUE 10,
    'maxContextSize' VALUE 200,
    'horizon' VALUE 5
  )
) OVER (
  ORDER BY timestamp
  RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
)
Description

The AI_FORECAST function analyzes a stream of time-series data and produces multi-step-ahead forecasts with uncertainty quantiles. The function uses TimesFM, a foundation model from Google Research that is managed by Confluent and hosted in Confluent Cloud, to learn patterns from historical data and predict future values.

Your data must include:

  • A timestamp column for ordering the time series.

  • A numeric value column representing the time-series data to forecast.

For more information, see Forecast Data Trends.

Parameters
  • value (DOUBLE): The numeric time-series value to forecast.

  • timestamp (TIMESTAMP(3)): The event time associated with the value.

  • config (STRING): Optional JSON object with configuration parameters:

    • horizon (INTEGER, default: 5): The number of future time steps to forecast.

    • maxContextSize (INTEGER, default: 512): The maximum number of historical data points used for prediction.

    • minContextSize (INTEGER, default: 20): The minimum number of data points required before forecasting begins.

    • model (STRING, default: 'timesfm-2.5'): The model to use for forecasting.

    • rmseWindowSize (INTEGER, default: 5): The rolling window size for tracking RMSE.

Output

The AI_FORECAST function returns a ROW data type with the following fields:

  • forecast (ARRAY<ROW<timestamp TIMESTAMP(3), mean DOUBLE, q10 DOUBLE, q50 DOUBLE, q90 DOUBLE>>): An array of forecasted data points, one per horizon step. Each entry contains the predicted timestamp, mean forecast, and three quantiles (10th, 50th, and 90th percentile).

  • metadata (STRING): A JSON string containing model metadata, including the model name, requested horizon, frequency, and input lengths.

Example
SELECT
    event_time,
    cpu_percent,
    AI_FORECAST(
        cpu_percent,
        event_time,
        JSON_OBJECT(
            'model' VALUE 'timesfm-2.5',
            'minContextSize' VALUE 10,
            'maxContextSize' VALUE 200,
            'horizon' VALUE 5
        )
    ) OVER (
        ORDER BY event_time
        RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS forecast_result
FROM cpu_metrics;

AI_SENTIMENT

Analyze sentiment toward specific aspects in text using aspect-based sentiment analysis.

Syntax
AI_SENTIMENT(
  text,
  ARRAY['aspect1', 'aspect2'],
  JSON_OBJECT(
    'batchSize' VALUE 32,
    'model' VALUE 'deberta'
  )
)
Description

You can use the AI_SENTIMENT function to analyze text and determine sentiment toward specific aspects. The function uses DeBERTa, a transformer model designed for aspect-based sentiment analysis that is managed by Confluent and hosted in Confluent Cloud, to classify sentiment for each aspect you specify as positive, negative, or neutral with a confidence score.

For more information, see Analyze Sentiment in Text.

Parameters
  • aspects (ARRAY<STRING>): The aspects to evaluate sentiment for. The array must not be null or empty.

  • config (STRING): Optional JSON object with configuration parameters:

    • batchSize (INTEGER, default: 32): The number of aspect-text pairs to process per batch. Must be greater than 0.

    • model (STRING, default: 'deberta'): The model to use for sentiment analysis.

  • text (STRING): The text to analyze for sentiment. The text must not be null or blank.

Output

The AI_SENTIMENT function returns a ROW data type with the following fields:

  • sentiment (ARRAY<ROW<aspect STRING, label STRING, score DOUBLE>>): An array of sentiment results, one per aspect. Each entry contains the aspect name, sentiment label as a string value ('positive', 'negative', or 'neutral'), and confidence score as a double value from 0.0 to 1.0.

  • metadata (STRING): A JSON string containing model metadata.

Example
SELECT
    review_id,
    review_text,
    AI_SENTIMENT(
        review_text,
        ARRAY['battery', 'screen', 'performance'],
        JSON_OBJECT('model' VALUE 'deberta')
    ) AS sentiment_result
FROM reviews;

AI_RUN_AGENT

Execute a streaming agent against real-time context data, enabling dynamic reasoning and tool invocation within SQL queries.

Syntax
AI_RUN_AGENT(agent_name, prompt, request_id
  [, system_table] [, invocation_config]);
Configuration
  • agent_name: Name of the registered agent to execute [STRING].

  • prompt: Task prompt for the agent [STRING].

  • request_id: Request ID for the request [STRING].

  • system_table: Optional. Name of a system log table to capture agent workflow logs. The table must use the required schema. For more information, see Monitor Streaming Agents [STRING].

  • invocation_config: Optional. Configuration map for agent execution, for example, MAP['debug', 'true'] to enable more descriptive errors [MAP].

Return Type

The function returns a ROW<String status, String response>.

  • status: Execution status, either SUCCESS or FAILED.

  • response: The final response from the agent.

Example
SELECT * FROM ml_catalog.db.my_table,
  LATERAL TABLE(AI_RUN_AGENT('claim_intake_agent', 'Extract structured data from this claim description', request_id));

Execute an agent with a system log table:

SELECT `prompt`, `status`, `response`
FROM `customer_queries`,
LATERAL TABLE(
  AI_RUN_AGENT(
    'my_agent', `prompt`, `id`,
    'agent_system_table'));

AI_TOOL_INVOKE

Invoke a registered tool, either externally by using an MCP server or locally by using a UDF, as part of an AI workflow.

Syntax
AI_TOOL_INVOKE(model_name, input_prompt, remote_udf_descriptor, mcp_tool_descriptor [, invocation_config]);
Description

The AI_TOOL_INVOKE function enables large language models (LLMs) to access various tools. The LLM decides which tools should be accessed, then the AI_TOOL_INVOKE function invokes the tools, gets the responses, and returns the responses to the LLM. The function returns a map that includes all the tools that were accessed, along with their responses and the status of the call, indicating whether it was a SUCCESS or FAILURE.

The following models are supported:

  • Anthropic

  • AzureOpenAI

  • Gemini

  • OpenAI

Note

The AI_TOOL_INVOKE function is available for preview.

A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.

Configuration
  • model_name: Name of the model entity to call [STRING].

  • input_prompt: Input prompt to pass to the LLM [STRING].

  • remote_udf_descriptor: Map to pass UDF names as key and function description as value [MAP<String, String>]. A maximum of 3 UDFs can be passed.

  • mcp_tool_descriptor: Map to pass MCP tool names as key and tool description as value [MAP<String, String>]. A maximum of 5 tools can be passed. This additional description is passed to the LLM as “Additional description”. If the MCP server already has a description, and if the server doesn’t have a description, mcp_tool_descriptor is added as the description. You can leave it empty, in which case no changes are made to the description provided by the server.

  • invocation_config[optional]: Map to pass the config to manage function behavior, for example, MAP['debug', true, 'on_error', 'continue'].

Example

The following example shows how to invoke a UDF and a registered external tool or API as part of an AI workflow.

When you create an MCP server connection, specify the following options:

  • endpoint: Defines the base URL for all non-SSE communications with the MCP server, including other http calls and general data exchange.

  • sse_endpoint: Specifies the explicit URL endpoint used to establish a Server-Sent Events (SSE) connection with the MCP server. If omitted, the client defaults to constructing the SSE endpoint by appending /sse to the domain specified in endpoint.

  • transport-type: Specifies the transport type to use for the connection. Valid values are SSE and STREAMABLE_HTTP. The default is SSE.

# Create an MCP server connection.
CREATE CONNECTION claims_mcp_server
  WITH (
    'type' = 'mcp_server',
    'endpoint' = 'https://mcp.deepwiki.com',
    'sse-endpoint' = 'https://mcp.deepwiki.com/sse',
    'api-key' = 'api_key'
  );
-- Create a model that uses the MCP server connection.
CREATE MODEL tool_invoker
  INPUT (input_message STRING)
  OUTPUT (tool_calls STRING)
  WITH(
    'provider' = 'openai',
    'openai.connection' = openai_connection,
    'openai.system_prompt' = 'Select the best tools to complete the task',
    'mcp.connection' = 'claims_mcp_server'
  );

-- Create a table that contains the input prompts.
CREATE TABLE claims_verified (
  id int,
  customer_id int
);

-- Run the AI_TOOL_INVOKE function.
SELECT
  id,
  customer_id,
  AI_TOOL_INVOKE(
    'tool_invoker',
    customer_id,
    MAP['udf_1', 'udf_1 description', 'udf_2', 'udf_2 description'],
    MAP['tool_1', 'tool_1_description', 'tool_2', 'tool_2_description']
  ) AS verified_result
FROM claims_verified;

Other built-in functions