CREATE MODEL Statement in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink®️ enables real-time inference and prediction with AI and ML models. The Flink SQL interface is available in Cloud Console and the Flink SQL shell.
Get started using AI models with Run an AI Model.
The following providers are supported:
- AWS Bedrock
- AWS Sagemaker
- Azure Machine Learning (Azure ML)
- Azure OpenAI
- Google AI
- OpenAI
- Vertex AI
Syntax¶
CREATE MODEL [IF NOT EXISTS] [[catalogname].[database_name]].model_name
[INPUT (input_column_list)]
[OUTPUT (outpt_column_list)]
[COMMENT model_comment]
WITH(model_option_list)
Description¶
Create a new AI model.
If a model with the same name exists already, a new version of the model is created. For more information, see version.
If the IF NOT EXISTS option is specified and a model with the same name exists already, the statement is ignored.
To view the currently registered models, use the SHOW MODELS statement.
To view the WITH options that were used to create the model, run the SHOW CREATE MODEL statement.
To view the versions, inputs, and outputs of the model, run the Models statement.
To change the name or options of an existing model, use the ALTER MODEL statement.
To delete a model from the current environment, use the DROP MODEL statement.
Task types¶
Confluent Cloud for Apache Flink supports these types of analysis for AI model inference:
- Classification: Categorize input data into predefined classes or labels. This task is used in applications like spam detection, where emails are classified as “spam” or “not spam”, and image recognition.
- Clustering: Group a set of objects so that objects in the same group, called a “cluster”, are more similar to each other than to those in other groups. This task is a form of unsupervised learning, because it doesn’t rely on predefined categories. Applications include customer segmentation in marketing and gene sequence analysis in biology.
- Embedding: Transform high-dimensional data into lower-dimensional vectors while preserving the relative distances between data points. This is crucial for tasks like natural language processing (NLP), where words or sentences are converted into vectors, enabling models to understand semantic similarities. Embeddings are used in recommendation systems, search engines, and more.
- Regression: Regression models predict a continuous output variable based on one or more input features. This task is used in scenarios like predicting house prices based on features like size, location, and number of bedrooms, or forecasting stock prices. Regression analysis helps in understanding the relationships between variables and forecasting.
- Text generation: Generate human-like text based on input data. Applications include chatbots, content creation, and language translation.
When you register an AI or ML model, you specify the task type by using the task property.
Examples¶
The following code example shows how to run an AI model. The model must be
created with the model provider and registered by using the CREATE MODEL
statement with <model-name>
.
SELECT * FROM my_table, LATERAL TABLE(ML_PREDICT('<model-name>', column1, column2));
All of the CREATE MODEL statements require a connection resource that you create by using the confluent flink connection create command, which resembles:
# Example command to create a connection for AWS Bedrock.
confluent flink connection create bedrock-cli-connection \
--cloud AWS \
--region us-west-2 \
--type bedrock \
--endpoint https://bedrock-runtime.us-west-2.amazonaws.com/model/amazon.titan-embed-text-v1/invoke \
--aws-access-key $AWS_ACCESS_KEY_ID \
--aws-secret-key $AWS_SECRET_ACCESS_KEY \
--aws-session-token $AWS_SESSION_TOKEN
Classification task¶
The following example shows how to create an OpenAI classification model. For more information, see Sentiment analysis with OpenAI LLM.
CREATE MODEL sentimentmodel
INPUT(text STRING)
OUTPUT(sentiment STRING)
COMMENT 'sentiment analysis model'
WITH (
'provider' = 'openai',
'task' = 'classification',
'openai.connection' = '<cli-connection>',
'openai.model_version' = 'gpt-3.5-turbo',
'openai.system_prompt' = 'Analyze the sentiment of the text and return only POSITIVE, NEGATIVE, or NEUTRAL.'
);
Clustering task¶
The following example shows how to create an Azure ML clustering model.
It requires that a K-Means model has been trained and deployed on Azure.
Replace <ENDPOINT>
and <REGION>
with your values.
CREATE MODEL clusteringmodel
INPUT (vectors ARRAY<FLOAT>, other_feature INT, other_feature2 STRING)
OUTPUT (cluster_num INT)
WITH (
'task' = 'clustering',
'provider' = 'azureml',
'azureml.connection' = '<cli-connection>'
);
Embedding task¶
The following example shows how to create an AWS Bedrock text embedding
model. Replace <REGION>
with your value. For more information, see
Text embedding with AWS Bedrock and Azure OpenAI.
CREATE MODEL embeddingmodel
INPUT (text STRING)
OUTPUT (embedding ARRAY<FLOAT>)
WITH (
'task' = 'embedding',
'provider' = 'bedrock',
'bedrock.connection' = '<cli-connection>'
);
Text generation task¶
The following example shows how to create an OpenAI text generation task for translating from English to Spanish.
CREATE MODEL translatemodel
INPUT(english STRING)
OUTPUT(spanish STRING)
COMMENT 'spanish translation model'
WITH (
'provider' = 'openai',
'task' = 'text_generation',
'openai.connection' = '<cli-connection>',
'openai.model_version' = 'gpt-3.5-turbo',
'openai.system_prompt' = 'Translate to spanish'
);
For more examples, see Run an AI Model.
Model versioning¶
A model can have multiple versions. A version is an integer number that starts at 1. The default version for a new model is 1. Currently, the maximum number of supported versions is 10.
New versions are created by the CREATE MODEL statement for the same model name. A new version increments the current maximum version by 1.
To view the versions of a model, use the DESCRIBE MODEL statement.
Only model options are versioned, which that means input/output format and comments don’t change across versions. The statement fails if input format, output format, or comments change. For model options, model task changes are not permitted.
The following code example shows the result of running CREATE MODEL twice with the same model name.
CREATE MODEL `my-model` ...
-- Output
`my-model` with version 1 created. Default version: 1
CREATE MODEL `my-model` ...
-- Output
`my-model` with version 2 created. Default version: 1
By default, version 1 is the default version when a model is first created. As more versions are created by the CREATE MODEL statement, you can change the default version by using the ALTER MODEL statement.
The following example shows how to change the default version of an existing model.
ALTER MODEL <model-name> SET ('default_version'='<version>');
You can access a specific version of a model in queries by using the
<model_name>$<model_version>
syntax. If no version is specified, the
default version is used.
The following code examples show how to use a specific version of a model in a query.
-- Use version 2 of the model.
SELECT * FROM `my-table` LATERAL TABLE (ML_PREDICT('my-model$2', col1, col2));
-- Use the default version of the model.
SELECT * FROM `my-table` LATERAL TABLE (ML_PREDICT('my-model', col1, col2));
Use the <model_name>$<model_version>
syntax to delete a specific version of
a model:
-- Delete a specific version of the model.
DROP MODEL `<model-name>$<version>`;
-- Delete all versions and the model.
DROP MODEL `<model-name>$all`;
The maximum version number is the next default version. If all versions are dropped, the whole model is deleted.
To change the version of an existing model, use the ALTER MODEL statement. If no version is specified, the default version is changed.
ALTER MODEL `<model-name>$<version>` SET ('k1'='v1', 'k2'='v2');
WITH options¶
Specify the details of your AI inference model by using the WITH clause.
The following tables show the supported properties in the WITH clause.
Model Provider | Property |
---|---|
Common | |
OpenAI | |
Azure OpenAI | |
Azure ML | |
Google AI | |
Sagemaker | |
Vertex AI |
Connection resource¶
Secrets must be set by using a connection resource that you create by using
the confluent flink connection create
command. The connection resource
securely contains the provider endpoint and secrets like the API key.
For example, the following command creates a connection to OpenAI, named “openai-cli-connection”.
confluent flink connection create openai-connection \
--cloud GCP \
--region us-central1 \
--type openai \
--endpoint https://api.openai.com/v1/chat/completions \
--api-key <your-api-key>
Specify the connection by name in the {PROVIDER}.connection property of the WITH clause.
The environment, cloud, and region options in the flink connection create
command must be the same as the compute pool which uses the connection.
The flink connection create
command supports an optional --environment
option, which specifies the Confluent Cloud environment. If you don’t specify the
--environment
option, you must use the confluent environment use
command to switch to the correct default environment.
The following code example shows how to refer to the connection named
openai-cli-connection
in the WITH clause:
'openai.connection' = 'openai-cli-connection'
The maximum secret length is 4000 bytes, which is checked after the string is converted to bytes.
Common properties¶
The following properties are common to all of the model providers.
{PROVIDER}.client_timeout¶
Set the request timeout to the client endpoint.
{PROVIDER}.connection¶
Set the credentials for connecting to a model provider. Create the connection
resource by using the confluent flink connection create
command.
This property is required.
{PROVIDER}.input_format¶
Set the json, text, or binary input format used by the model. Each provider has a default value.
This property is optional.
For supported input formats, see Text generation and LLM model formats and Other formats.
{PROVIDER}.input_content_type¶
The HTTP content media type header to set when calling the model. The value is
a Media/MIME type.
The default is chosen based on input_format
.
Usually, this property is required only for Sagemaker and Bedrock models.
{PROVIDER}.output_format¶
Set the json, text, or binary output format used by the model. The default is
chosen based on input_format
.
This property is optional.
For supported output formats, see Text generation and LLM model formats and Other formats..
{PROVIDER}.output_content_type¶
The HTTP Accept
media type header to set when calling the model. The value is
a Media/MIME type.
The default is chosen based on output_format
.
Usually, this property is required only for Sagemaker and Bedrock models.
{PROVIDER}.PARAMS.*¶
Provide parameters based on the input_format
. The maximum number of
parameters you can set is 32.
This property is optional.
For more information, see Parameters.
{PROVIDER}.system_prompt¶
A system prompt passed to an LLM model to give it general behavioral instructions. The value is a string.
Not all models support a system prompt.
This property is optional.
task¶
Specify the kind of analysis to perform.
Supported values are:
- “classification”
- “clustering”
- “embedding”
- “regression”
- “text_generation”
This property is required.
OpenAI properties¶
openai.input_format¶
Set the input format used by the model. The default is OPENAI-CHAT
.
This property is optional.
openai.model_version¶
Set the version string of the requested model. The default is
gpt-3.5-turbo
.
This property is optional.
Azure OpenAI properties¶
Properties for OpenAI models deployed in Azure AI Studio. Azure OpenAI accepts all of the OpenAI parameters, but with a different endpoint.
azureopenai.input_format¶
Set the input format used by the model. The default is OPENAI-CHAT
.
This property is optional.
azureopenai.model_version¶
Set the version string of the requested model. The default is
gpt-3.5-turbo
.
This property is optional.
Azure ML properties¶
Properties for both Azure Machine Learning and LLM models from Azure AI Studio can use this provider.
azureml.input_format¶
Set the input format used by the model. The default is
AZUREML-PANDAS-DATAFRAME
.
For AI Studio LLMs, OPENAI-CHAT
is usually the correct format, even for
non-OpenAI models.
This property is optional.
azureml.deployment_name¶
Set the model name.
Bedrock properties¶
The default input_format
for Bedrock is determined automatically based on
the model endpoint, or AMAZON-TITAN-TEXT
if there is no match. If necessary,
change it to match the model for your endpoint.
Google AI properties¶
googleai.input_format¶
Set the input format used by the model. The default is GEMINI-GENERATE
.
This property is optional.
Sagemaker properties¶
sagemaker.custom_attribute¶
Set a model-dependent value that is passed through to Sagemaker in the header of the same name.
This property is optional.
sagemaker.enable_explanations¶
Enable writing explanations, if your model supports them. Passed through to Sagemaker in the header of the same name.
If your model supports writing explanations, they should be disabled, because Confluent Cloud for Apache Flink currently doesn’t support reading them.
Don’t set enable_explanations
if the model doesn’t support explanations,
because this causes Sagemaker to return an error.
This property is optional.
sagemaker.inference_component_name¶
Specify which inference component to use in the endpoint. Passed through to Sagemaker in the header of the same name.
This property is optional.
sagemaker.inference_id¶
Set an ID that is passed through to Sagemaker in the header of the same name. Used for tracking request origins.
This property is optional.
sagemaker.input_content_type¶
The HTTP content media type header to set when calling the model.
Setting this property overrides the Content-type
header for the model
request. Many Sagemaker models use this header to determine their behavior,
but set it only if choosing an appropriate input_format
is not sufficient.
This property is optional.
sagemaker.output_content_type¶
The HTTP Accept
media type header to set when calling the model.
Setting this property overrides the Accept
header for the model request.
Some Sagemaker models use this header to determine their outputs, but set it
only if choosing an appropriate output_format
is not sufficient.
This property is optional.
sagemaker.target_container_hostname¶
Allows calling a specific container when the endpoint has multiple containers. Passed through to Sagemaker in the header of the same name.
This property is optional.
sagemaker.target_model¶
Enables calling a specific model from multiple models deployed to the same endpoint. Passed through to Sagemaker in the header of the same name.
This property is optional.
sagemaker.target_variant¶
Enables calling a specific version of the model from multiple deployed variants. Passed through to Sagemaker in the header of the same name.
This property is optional.
Vertex AI properties¶
vertexai.service_key¶
Set the Service Account Key of a service account with permission to call the inference endpoint. This value is a secret.
This property is required.
vertexai.input_format¶
Set the input format used by the model. The default is TF-SERVING
.
Defaults to GEMINI-GENERATE
if the endpoint is for a published Gemini model.
This property is optional.
Supported input/output formats¶
The following input/output formats for text generation and LLM models are supported.
The following additional input/output formats are supported.
AZUREML-PANDAS-DATAFRAME | AZUREML-TENSOR | BINARY |
CSV | JSON | JSON-ARRAY |
JSON:wrapper | KSERVE-V1 | KSERVE-V2 |
MLFLOW-TENSOR | PANDAS-DATAFRAME | TEXT |
TF-SERVING | TF-SERVING-COLUMN | TRITON |
VERTEXAI-PYTORCH |
Parameters¶
The text generation and LLM formats support some or all of the following parameters.
{PROVIDER}.PARAMS.temperature¶
Controls the randomness or “creativity” of the output. Typical values are between 0.0 and 1.0.
This parameter is model-dependent. Its type is Float
.
{PROVIDER}.PARAMS.top_p¶
The probability cutoff for token selection. Usually, either temperature or top_p are specified, but not both.
This parameter is model-dependent. Its type is Float
.
{PROVIDER}.PARAMS.top_k¶
The number of possible tokens to sample from at each step.
This parameter is model-dependent. Its type is Float
.
{PROVIDER}.PARAMS.stop¶
A CSV list of strings to pass as stop sequences to the model.
{PROVIDER}.PARAMS.max_tokens¶
The maximum number of tokens for the model to return.
Its type is Int
.
Text generation and LLM model formats¶
The following formats are intended for text generation models and LLMs. They require that the model has a single STRING input and a single STRING output.
AI-21-COMPLETE¶
This format is for models using the AI21 Labs J2 Complete API, including the AI21 Labs Foundation models on AWS Bedrock.
This format does not support the top_k parameter.
AMAZON-TITAN-EMBED¶
This format is for Amazon Titan Text Embedding models.
AMAZON-TITAN-TEXT¶
The format is for Amazon’s Titan Text models. This is the default format for the AWS Bedrock provider.
This format does not support the top_k parameter.
ANTHROPIC-COMPLETIONS¶
This format is for models using the Anthropic Claude Text Completions API, including some Anthropic models on AWS Bedrock.
ANTHROPIC-MESSAGES¶
This format is for models using the Anthropic Claude Messages API, including some Anthropic models on AWS Bedrock.
Some Anthropic models accept both this and the Completions API format.
AZURE-EMBED¶
The embedding format used by other foundation models on Azure. This format is the same as OPENAI-EMBED.
BEDROCK-LLAMA¶
The format used by Llama models on AWS Bedrock.
COHERE-CHAT¶
The Cohere Chat API format.
COHERE-EMBED¶
Cohere’s Embedding API format.
COHERE-GENERATE¶
The legacy Cohere Chat API format.
This format is used by AWS Bedrock Cohere Command models.
GEMINI-GENERATE¶
The Google Gemini API format.
This is the default format for the Google AI provider, but you can also use it with Gemini models on the Google Vertex AI.
GEMINI-CHAT¶
Same as the GEMINI-GENERATE format.
MISTRAL-CHAT¶
The standard Mistral API format.
MISTRAL-COMPLETIONS¶
The legacy Mistral Completions API format used by AWS Bedrock.
OPENAI-CHAT¶
The OpenAI Chat API format. This is the default for the OpenAI and Azure OpenAI providers. It is also generally used by most non-OpenAI LLM models deployed in Azure AI Studio using the Azure ML provider.
OPENAI-EMBED¶
The OpenAI Embedding model format.
VERTEX-EMBED¶
The Embedding format for Vertex AI Gemini models.
Other formats¶
The following formats are intended for predictive models running on providers like Sagemaker, Vertex AI, and Azure ML. Usually, these models are used for tasks like classification, regression, and clustering.
Currently, none of these formats support PARAMS.
Unless specified, each input format defaults to the associated output format with the same name.
AZUREML-PANDAS-DATAFRAME¶
Azure ML’s version of the Pandas Dataframe Split format. The only difference is that this version has “input_data” as the top-level field, instead of “dataframe_split”.
This is the default format for Azure ML models.
The output format defaults to JSON-ARRAY.
AZUREML-TENSOR¶
Azure ML’s version of named input tensors. Equivalent to the “JSON:input_data” input format. the output format defaults to “JSON:outputs”.
BINARY¶
Raw binary inputs, serialized in little-endian byte order. This input format accepts multiple input columns, which are packed in order.
CSV¶
Comma separated text. This is the default format for Sagemaker models, but Sagemaker models vary widely, and most models must choose a different format.
JSON¶
The inputs are formatted as a JSON object, with field names equal to the column names of the model input schema.
The JSON format supports user-defined parameters. If you specify
'{provider}.params.some_key'='value'
in the WITH options, the key and value
are used in the JSON input as {"some_key": "value"}
.
Example:
{
"column1": "String Data",
"column2": [1,2,3,4]
}
JSON-ARRAY¶
The inputs are formatted as a JSON array, including [] brackets, but without the {} braces of a top-level JSON object. Column names are not included in the format.
If the model takes a single input array column, it will be output as the top-level array. Models with multiple inputs have their arrays nested in JSON fashion.
This format is usually appropriate for models that expect Numpy arrays.
Example:
[1,2,3,"String Data"]
JSON:wrapper¶
Similar to the default JSON behavior, but all fields are wrapped in a named top-level object. The wrapper may be any valid JSON string.
Example:
{
"wrapper": {
"column1": "String Data",
"column2: [1,2,3,4]
}
}
KSERVE-V1¶
Same as the TF-SERVING format.
MLFLOW-TENSOR¶
The format used by some MLFlow models. It is the same format as TF-SERVING-COLUMN.
PANDAS-DATAFRAME¶
The Pandas Dataframe Split format used by most MLFlow models.
The output format defaults to JSON-ARRAY.
TEXT¶
Model input values formatted as raw text. Use newlines to separate multiple inputs.
TF-SERVING¶
The Tensorflow Serving Row format. This is the default format for Vertex AI models. It is generally the correct format to use for most predictive models trained in Vertex AI.
TF-SERVING-COLUMN¶
The TensorFlow Serving Column format. It is exactly equivalent to “JSON:inputs”. The output format defaults to “JSON:outputs”.
TRITON¶
The Triton/KServeV2 format used by NVidia Triton Inference Servers.
When possible, this format serializes data in the protocol’s mixed json+binary format. Note that some Tensor datatypes, like 16-bit floats, do not have an exact equivalent in Flink SQL, but they are converted, when possible.
VERTEXAI-PYTORCH¶
Vertex AI’s format for PyTorch models. This format is the TF-SERVING format with an extra wrapper around the data.
The output format defaults to TF-SERVING.