AI Model Inference Functions in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink®️ provides the ML_PREDICT and ML_EVALUATE built-in functions to invoke remote AI/ML models in Flink SQL queries. These simplify developing and deploying AI applications by providing a unified platform for both data processing and AI/ML tasks.

For more information, see Run an AI Model.

Confluent Cloud for Apache Flink also supports read-only external tables to enable search with federated query execution on external vector databases, like MongoDB, Pinecone, and Elasticsearch.

ML_EVALUATE

Aggregate a table and return model evaluation metrics.

Syntax
ML_EVALUATE(`model_name`, label, col1, col2, ...)
Description

The ML_EVALUATE function is a table aggregation function that takes an entire table and returns a single row of model evaluation metrics. If run on all versions of a model, the function returns one row for each model version. After comparing the metrics for different versions, you can update the default version for deployment with the model that has the best evaluation metrics.

Before using ML_EVALUATE, you must register the model by using the CREATE MODEL statment.

The first argument to the ML_EVALUATE table function is the model name. The second argument is the true label that the output of the model should be evaluated against. Its type depends on the model OUTPUT type and the model task. The other arguments are the columns used for prediction. They are defined in the model resource INPUT for AI models and may vary in length or type.

The metric columns returned by ML_EVALUATE depend on the task type of the specified model.

Internally, the ML_EVALUATE function runs ML_PREDICT and processes the results.

Example

After you have registered the AI model by using the CREATE MODEL statement, run the model by using the ML_EVALUATE function in a SQL query.

The following example statement registers a remote OpenAI model for a classification task.

CREATE MODEL `my_remote_model`
INPUT (f1 INT, f2 STRING)
OUTPUT (output_label STRING)
WITH(
  'task' = 'classification',
  'provider' = 'openai',
  'openai.endpoint' = 'https://api.openai.com/v1/llm/v1/chat',
  'openai.api_key' = '<api-key>'
);

The following statements show how to run the ML_EVALUATE function on various versions of my_remote_model using data in a table named eval_data.

-- Model evaluation with all versions
SELECT ML_EVALUATE(`my_remote_model$all`, f1, f2) FROM `eval_data`;

-- Model evaluation with default version
SELECT ML_EVALUATE(`my_remote_model`, f1, f2) FROM `eval_data`;

-- Model evaluation with specific version 2
SELECT ML_EVALUATE(`my_remote_model$2`, f1, f2) FROM `eval_data`;

ML_PREDICT

Run a remote AI/ML model for tasks like predicting outcomes, generating text, and classification.

Syntax
ML_PREDICT(`model_name[.version_id]`, (select_list)[, ROW<threshold DOUBLE, keep_columns BOOLEAN>])
Description

The ML_PREDICT function performs predictions using pre-trained machine learning models.

The first argument to the ML_PREDICT table function is the model name. The other arguments are the columns used for prediction. They are defined in the model resource INPUT for AI models and may vary in length or type.

Before using ML_PREDICT, you must register the model by using the CREATE MODEL statment.

Example

After you have registered the AI model by using the CREATE MODEL statement, run the model by using the ML_PREDICT function in a SQL query.

The following example runs a model named embeddingmodel on the data in a table named text_stream.

SELECT id, text, embedding FROM text_stream, LATERAL TABLE(ML_PREDICT('embeddingmodel', text));