documentation
Get Started Free
  • Get Started Free
  • Stream
      Confluent Cloud

      Fully-managed data streaming platform with a cloud-native Kafka engine (KORA) for elastic scaling, with enterprise security, stream processing, governance.

      Confluent Platform

      An on-premises enterprise-grade distribution of Apache Kafka with enterprise security, stream processing, governance.

  • Connect
      Managed

      Use fully-managed connectors with Confluent Cloud to connect to data sources and sinks.

      Self-Managed

      Use self-managed connectors with Confluent Platform to connect to data sources and sinks.

  • Govern
      Managed

      Use fully-managed Schema Registry and Stream Governance with Confluent Cloud.

      Self-Managed

      Use self-managed Schema Registry and Stream Governance with Confluent Platform.

  • Process
      Managed

      Use Flink on Confluent Cloud to run complex, stateful, low-latency streaming applications.

      Self-Managed

      Use Flink on Confluent Platform to run complex, stateful, low-latency streaming applications.

Stream
Confluent Cloud

Fully-managed data streaming platform with a cloud-native Kafka engine (KORA) for elastic scaling, with enterprise security, stream processing, governance.

Confluent Platform

An on-premises enterprise-grade distribution of Apache Kafka with enterprise security, stream processing, governance.

Connect
Managed

Use fully-managed connectors with Confluent Cloud to connect to data sources and sinks.

Self-Managed

Use self-managed connectors with Confluent Platform to connect to data sources and sinks.

Govern
Managed

Use fully-managed Schema Registry and Stream Governance with Confluent Cloud.

Self-Managed

Use self-managed Schema Registry and Stream Governance with Confluent Platform.

Process
Managed

Use Flink on Confluent Cloud to run complex, stateful, low-latency streaming applications.

Self-Managed

Use Flink on Confluent Platform to run complex, stateful, low-latency streaming applications.

Learn
Get Started Free
  1. Home
  2. Cloud
  3. Build AI with Flink SQL and Confluent Cloud for Apache Flink
  4. Built-in AI/ML Functions with Confluent Cloud for Apache Flink

CLOUD

  • Overview
  • Get Started
    • Overview
    • Quick Start
    • REST API Quick Start
    • Manage Schemas
    • Deploy Free Clusters
    • Tutorials and Examples
      • Overview
      • Example: Use Replicator to Copy Kafka Data to Cloud
      • Example: Create Fully-Managed Services
      • Example: Build an ETL Pipeline
  • Manage Kafka Clusters
    • Overview
    • Cluster Types
    • Manage Configuration Settings
    • Cloud Providers and Regions
    • Resilience
    • Copy Data with Cluster Linking
      • Overview
      • Quick Start
      • Use Cases and Tutorials
        • Share Data Across Clusters, Regions, and Clouds
        • Disaster Recovery and Failover
        • Create Hybrid Cloud and Bridge-to-Cloud Deployments
        • Use Tiered Separation of Critical Workloads
        • Migrate Data
        • Manage Audit Logs
      • Configure, Manage, and Monitor
        • Configure and Manage Cluster Links
        • Manage Mirror Topics
        • Manage Private Networking
        • Manage Security
        • Monitor Metrics
      • FAQ
      • Troubleshooting
    • Copy Data with Replicator
      • Quick Start
      • Use Replicator to Migrate Topics
    • Resize a Dedicated Cluster
    • Multi-Tenancy and Client Quotas for Dedicated Clusters
      • Overview
      • Quick Start
    • Create Cluster Using Terraform
    • Create Cluster Using Pulumi
    • Connect Confluent Platform and Cloud Environments
      • Overview
      • Connect Self-Managed Control Center to Cloud
      • Connect Self-Managed Clients to Cloud
      • Connect Self-Managed Connect to Cloud
      • Connect Self-Managed REST Proxy to Cloud
      • Connect Self-Managed ksqlDB to Cloud
      • Connect Self-Managed MQTT to Cloud
      • Connect Self-Managed Schema Registry to Cloud
      • Connect Self-Managed Streams to Cloud
      • Example: Autogenerate Self-Managed Component Configs for Cloud
  • Build Streaming Applications
    • Overview
    • Architectural Considerations
    • Client Quick Start
    • Configure Clients
      • Consumer
      • Producer
      • Configuration Properties
      • Connect Program
    • Test and Monitor a Client
      • Test
      • Monitor
      • Reset Offsets
    • Optimize and Tune
      • Overview
      • Configuration Settings
      • Throughput
      • Latency
      • Durability
      • Availability
      • Freight
    • Client Guides
      • Python
      • .NET Client
      • JavaScript Client
      • Go Client
      • C++ Client
      • Java Client
    • Kafka Client APIs
      • Python Client API
      • .NET Client API
      • JavaScript Client API
      • Go Client API
      • C++ Client API
      • Java Client API
    • Deprecated Client APIs
    • Client Examples
      • Overview
      • Python Client
      • .NET Client
      • JavaScript Client
      • Go Client
      • C++ Client
      • Java
      • Spring Boot
      • KafkaProducer
      • REST
      • Clojure
      • Groovy
      • Kafka Connect Datagen
      • kafkacat
      • Kotlin
      • Ruby
      • Rust
      • Scala
    • VS Code Extension
  • Build Kafka Streams Applications
    • Overview
    • Quick Start
    • Monitor Applications
    • ksqlDB
      • Create Stream Processing Apps with ksqlDB
      • Quick Start
      • Enable ksqlDB Integration with Schema Registry
      • ksqlDB Cluster API Quick Start
      • Monitor ksqlDB
      • Manage ksqlDB by using the CLI
      • Manage Connectors With ksqlDB
      • Develop ksqlDB Applications
      • Pull Queries
      • Grant Role-Based Access
      • Migrate ksqlDB Applications on Confluent Cloud
  • Build AI with Confluent
    • Overview
    • Streaming Agents
      • Overview
      • Call MCP Tools
    • Built-in AI/ML Functions
      • Overview
      • Detect Anomalies
      • Forecast Trends
      • ML Preprocessing Functions
      • Model Inference Functions
    • Create Embeddings
      • Overview
      • Create Embeddings
    • Run an AI Model
    • Search External Tables
      • Overview
      • Key Search with External Databases
      • Text Search with External Databases
      • Vector Search with External Databases
    • FAQ
  • Manage Topics
    • Overview
    • Configuration Reference
    • Message Browser
    • Share Streams
      • Overview
      • Provide Stream Shares
      • Consume Stream Shares
    • Tableflow
      • Overview
      • Concepts
        • Overview
        • Storage
        • Schemas
        • Materialize Change Data Capture Streams
        • Billing
      • Get Started
        • Overview
        • Quick Start with Managed Storage
        • Quick Start Using Your Storage and AWS Glue
        • Quick Start with Delta Lake Tables
      • How-to Guides
        • Overview
        • Configure Storage
        • Integrate Catalogs
          • Overview
          • Integrate with AWS Glue Catalog
          • Integrate with Snowflake Open Catalog or Apache Polaris
          • Integrate with Unity Catalog
        • Query Data
          • Overview
          • Query with AWS
          • Query with Flink
          • Query with Snowflake
          • Query with Trino
      • Operate
        • Overview
        • Configure
        • Grant Role-Based Access
        • Monitor
        • Use Private Networking
        • Supported Cloud Regions
  • Govern Data Streams
    • Overview
    • Stream Governance
      • Manage Governance Packages
      • Data Portal
      • Track Data with Stream Lineage
      • Manage Stream Catalog
        • Stream Catalog User Guide
        • REST API Catalog Usage and Examples Guide
        • GraphQL API Catalog Usage and Examples Guide
    • Manage Schemas
      • Overview
      • Manage Schemas
      • Delete Schemas and Manage Storage
      • Use Broker-Side Schema ID Validation
      • Schema Linking
      • Schema Registry Tutorial
    • Fundamentals
      • Key Concepts
      • Schema Evolution and Compatibility
      • Schema Formats
        • Serializers and Deserializers Overview
        • Avro
        • Protobuf
        • JSON Schema
      • Data Contracts
      • Security Considerations
      • Enable Private Networking
        • Enable Private Networking with Schema Registry PrivateLink
        • Enable Private Networking for Schema Registry with a Public Endpoint
    • Reference
      • Configure Clients to Schema Registry
      • Schema Registry REST API Usage Examples
      • Use AsyncAPI to Describe Topics and Schemas
      • Maven Plugin
    • FAQ
  • Connect to External Services
    • Overview
    • Install Connectors
      • ActiveMQ Source
      • AlloyDB Sink
      • Amazon CloudWatch Logs Source
      • Amazon CloudWatch Metrics Sink
      • Amazon DynamoDB CDC Source
      • Amazon DynamoDB Sink
      • Amazon Kinesis Source
      • Amazon Redshift Sink
      • Amazon S3 Sink
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
      • Amazon S3 Source
      • Amazon SQS Source
      • AWS Lambda Sink
      • Azure Blob Storage Sink
        • Configure and Launch
        • Configure with Azure Egress Private Link Endpoints
      • Azure Blob Storage Source
      • Azure Cognitive Search Sink
      • Azure Cosmos DB Sink
      • Azure Cosmos DB Sink V2
      • Azure Cosmos DB Source
      • Azure Cosmos DB Source V2
      • Azure Data Lake Storage Gen2 Sink
      • Azure Event Hubs Source
      • Azure Functions Sink
      • Azure Log Analytics Sink
      • Azure Service Bus Source
      • Azure Synapse Analytics Sink
      • ClickHouse Sink
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
        • Configure with Google Cloud Private Service Connect Endpoints
      • Couchbase Source
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
        • Configure with Azure Egress Private Link Endpoints
      • Couchbase Sink
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
        • Configure with Azure Egress Private Link Endpoints
      • Databricks Delta Lake Sink
        • Set up Databricks Delta Lake (AWS) Sink Connector for Confluent Cloud
        • Configure and launch the connector
      • Datadog Metrics Sink
      • Datagen Source (development and testing)
      • Elasticsearch Service Sink
      • GitHub Source
      • Google BigQuery Sink [Deprecated]
      • Google BigQuery Sink V2
      • Google Cloud BigTable Sink
      • Google Cloud Dataproc Sink [Deprecated]
      • Google Cloud Functions Gen 2 Sink
      • Google Cloud Functions Sink [Deprecated]
      • Google Cloud Pub/Sub Source
      • Google Cloud Spanner Sink
      • Google Cloud Storage Sink
      • Google Cloud Storage Source
      • HTTP Sink
      • HTTP Sink V2
      • HTTP Source
      • HTTP Source V2
      • IBM MQ Source
      • InfluxDB 2 Sink
      • InfluxDB 2 Source
      • Jira Source
      • MariaDB CDC Source
      • Microsoft SQL Server CDC Source (Debezium) [Deprecated]
      • Microsoft SQL Server CDC Source V2 (Debezium)
        • Configure and launch the connector
        • Backward incompatibility considerations
      • Microsoft SQL Server Sink (JDBC)
      • Microsoft SQL Server Source (JDBC)
      • MongoDB Atlas Sink
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
        • Configure with Azure Egress Private Link Endpoints
        • Configure with Google Cloud Egress Private Service Connect Endpoints
      • MongoDB Atlas Source
      • MQTT Sink
      • MQTT Source
      • MySQL CDC Source (Debezium) [Deprecated]
      • MySQL CDC Source V2 (Debezium)
        • Configure and Launch the connector
        • Backward Incompatible Changes
      • MySQL Sink (JDBC)
      • MySQL Source (JDBC)
      • New Relic Metrics Sink
      • OpenSearch Sink
      • Oracle XStream CDC Source
        • Overview
        • Configure and Launch the connector
        • Oracle Database Prerequisites
        • Change Events
        • Examples
        • Troubleshooting
      • Oracle CDC Source
        • Overview
        • Configure and Launch the connector
        • Horizontal Scaling
        • Oracle Database Prerequisites
        • SMT Examples
        • DDL Changes
        • Troubleshooting
      • Oracle Database Sink (JDBC)
      • Oracle Database Source (JDBC)
      • PagerDuty Sink [Deprecated]
      • Pinecone Sink
      • PostgreSQL CDC Source (Debezium) [Deprecated]
      • PostgreSQL CDC Source V2 (Debezium)
        • Configure and Launch the connector
        • Backward Incompatible Changes
      • PostgreSQL Sink (JDBC)
      • PostgreSQL Source (JDBC)
      • RabbitMQ Sink
      • RabbitMQ Source
      • Redis Sink
      • Salesforce Bulk API 2.0 Sink
      • Salesforce Bulk API 2.0 Source
      • Salesforce Bulk API Source
      • Salesforce CDC Source
      • Salesforce Platform Event Sink
      • Salesforce Platform Event Source
      • Salesforce PushTopic Source
      • Salesforce SObject Sink
      • ServiceNow Sink
      • ServiceNow Source [Legacy]
      • ServiceNow Source V2
      • SFTP Sink
      • SFTP Source
      • Snowflake Sink
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
        • Configure with Azure Egress Private Link Endpoints
        • Configure with Google Cloud Private Service Connect Endpoints
      • Snowflake Source
        • Configure and Launch
        • Configure with AWS Egress PrivateLink Endpoints
        • Configure with Azure Egress Private Link Endpoints
        • Configure with Google Cloud Private Service Connect Endpoints
      • Solace Sink
      • Splunk Sink
      • Zendesk Source
    • Confluent Hub
      • Overview
      • Component Archive Specification
      • Contribute
    • Install Custom Plugins and Custom Connectors
      • Overview
      • Quick Start
      • Manage Custom Connectors
      • Limitations and Support
      • API and CLI
    • Manage CSFLE
    • Manage Provider Integration
    • Networking and DNS
      • Overview
      • AWS Egress PrivateLink Endpoints for First-Party Services
      • AWS Egress PrivateLink Endpoints for Self-Managed Services
      • AWS Egress PrivateLink Endpoints for Amazon RDS
      • Azure Egress Private Link Endpoints for First-Party Services
      • Azure Egress Private Link Endpoints for Self-Managed Services
      • Google Cloud Private Service Connect Endpoints for First-Party Services
    • Connect API Usage
    • Manage Public Egress IP Addresses
    • Sample Connector Output
    • Configure Single Message Transforms
    • Configure Custom SMTs
      • Quick Start
      • Manage Custom SMT APIs
      • Limitations and Support
    • View Connector Events
    • Interpret Connector Statuses
    • Manage Service Accounts
    • Configure RBAC
    • View Errors in the Dead Letter Queue
    • Connector Limits
    • Manage Offsets
    • Transforms List
      • Overview
      • Cast
      • Drop
      • DropHeaders
      • EventRouter
      • ExtractField
      • ExtractTopic
      • Filter (Kafka)
      • Filter (Confluent)
      • Flatten (Kafka)
      • Flatten (Confluent)
      • GzipDecompress
      • HeaderFrom
      • HeaderToValue
      • HoistField
      • InsertField
      • InsertHeader
      • MaskField
      • MessageTimestampRouter
      • RegexRouter
      • ReplaceField (Kafka)
      • ReplaceField (Confluent)
      • SetSchemaMetadata
      • TimestampConverter
      • TimestampRouter
      • TombstoneHandler
      • TopicRegexRouter
      • ValueToKey
    • Reference
      • Additional Connector Configuration
  • Integrate with Confluent Cloud
    • Overview
    • Reuse Connections with External Services
      • Overview
      • Supported External Services
      • Manage Connections
    • Integrate with Cloud Service Providers
      • Overview
      • Create an AWS Provider Integration
      • Manage an AWS Provider Integration
  • Process Data with Flink
    • Overview
    • Get Started
      • Overview
      • Quick Start with Cloud Console
      • Quick Start with SQL Shell in Confluent CLI
      • Quick Start with Java Table API
      • Quick Start with Python Table API
    • Concepts
      • Overview
      • Autopilot
      • Batch and Stream Processing
      • Billing
      • Comparison with Apache Flink
      • Compute Pools
      • Delivery Guarantees and Latency
      • Determinism
      • Private Networking
      • Schema and Statement Evolution
      • Snapshot Queries
      • Statements
      • Statement CFU Metrics
      • Tables and Topics
      • Time and Watermarks
      • User-defined Functions
    • How-To Guides
      • Overview
      • Aggregate a Stream in a Tumbling Window
      • Combine Streams and Track Most Recent Records
      • Compare Current and Previous Values in a Stream
      • Convert the Serialization Format of a Topic
      • Create a UDF
      • Deduplicate Rows in a Table
      • Enable UDF Logging
      • Handle Multiple Event Types
      • Mask Fields in a Table
      • Process Schemaless Events
      • Profile a Query
      • Resolve Common SQL Query Problems
      • Scan and Summarize Tables
      • Run a Snapshot Query
      • Transform a Topic
      • View Time Series Data
    • Operate and Deploy
      • Overview
      • Carry-over Offsets
      • Deploy a Statement with CI/CD
      • Enable Private Networking
      • Generate a Flink API Key
      • Grant Role-Based Access
      • Manage Compute Pools
      • Manage Connections
      • Monitor and Manage Statements
      • Move SQL Statements to Production
      • Profile Queries
      • REST API
    • Flink Reference
      • Overview
      • SQL Syntax
      • DDL Statements
        • Statements Overview
        • ALTER CONNECTION
        • ALTER MODEL
        • ALTER TABLE
        • ALTER VIEW
        • CREATE CONNECTION
        • CREATE FUNCTION
        • CREATE MODEL
        • CREATE TABLE
        • CREATE VIEW
        • DESCRIBE
        • DROP CONNECTION
        • DROP MODEL
        • DROP TABLE
        • DROP VIEW
        • HINTS
        • EXPLAIN
        • RESET
        • SET
        • SHOW
        • USE CATALOG
        • USE database_name
      • DML Statements
        • Queries Overview
        • Deduplication
        • Group Aggregation
        • INSERT INTO FROM SELECT
        • INSERT VALUES
        • Joins
        • LIMIT
        • Pattern Recognition
        • ORDER BY
        • OVER Aggregation
        • SELECT
        • Set Logic
        • EXECUTE STATEMENT SET
        • Top-N
        • Window Aggregation
        • Window Deduplication
        • Window Join
        • Window Top-N
        • Window Table-Valued Function
        • WITH
      • Functions
        • AI Model Inference
        • Aggregate
        • Collections
        • Comparison
        • Conditional
        • Datetime
        • Flink SQL Functions
        • Hashing
        • JSON
        • ML Preprocessing
        • Numeric
        • String
        • Table API
      • Data Types
      • Data Type Mappings
      • Time Zone
      • Keywords
      • Information Schema
      • Example Streams
      • Supported Cloud Regions
      • SQL Examples
      • Table API
      • CLI Reference
    • Get Help
    • FAQ
  • Manage Networking
    • Confluent Cloud Networking Overview
    • Networking on AWS
      • AWS Networking Overview
      • Public Networking on AWS
      • Confluent Cloud Network on AWS
      • PrivateLink on AWS
        • PrivateLink Overview
        • Inbound PrivateLink for Dedicated Clusters
        • Inbound PrivateLink for Serverless Products
        • Outbound PrivateLink for Dedicated Clusters
        • Outbound PrivateLink for Serverless Products
      • VPC Peering on AWS
      • Transit Gateway on AWS
      • Private Network Interface on AWS
    • Networking on Azure
      • Azure Networking Overview
      • Public Networking on Azure
      • Confluent Cloud Network on Azure
      • Private Link on Azure
        • Private Link Overview
        • Inbound Private Link for Dedicated Clusters
        • Inbound Private Link for Serverless Products
        • Outbound Private Link for Dedicated Clusters
        • Outbound Private Link for Serverless Products
      • VNet Peering on Azure
    • Networking on Google Cloud
      • Google Cloud Networking Overview
      • Public Networking on Google Cloud
      • Confluent Cloud Network on Google Cloud
      • Private Service Connect on Google Cloud
        • Private Service Connect Overview
        • Inbound Private Service Connect for Dedicated Clusters
        • Inbound Private Service Connect for Serverless Products
        • Outbound Private Service Connect for Dedicated Clusters
      • VPC Peering on Google Cloud
    • Connectivity for Confluent Resources
      • Overview
      • Public Egress IP Address for Connectors and Cluster Linking
      • Cluster Linking using AWS PrivateLink
      • Follower Fetching using AWS VPC Peering
    • Use the Confluent Cloud Console with Private Networking
    • Test Connectivity
  • Log and Monitor
    • Metrics
    • Manage Notifications
    • Monitor Consumer Lag
    • Monitor Dedicated Clusters
      • Monitor Cluster Load
      • Manage Performance and Expansion
      • Track Usage by Team
    • Observability for Kafka Clients to Confluent Cloud
  • Manage Security
    • Overview
    • Manage Authentication
      • Overview
      • Manage User Identities
        • Overview
        • Manage User Accounts
          • Overview
          • Authentication Security Protections
          • Manage Local User Accounts
          • Multi-factor Authentication
          • Manage SSO User Accounts
        • Manage User Identity Providers
          • Overview
          • Use Single Sign-On (SSO)
          • Manage SAML Single Sign-On (SSO)
          • Manage Azure Marketplace SSO
          • Just-in-time User Provisioning
          • Group Mapping
            • Overview
            • Enable Group Mapping
            • Manage Group Mappings
            • Troubleshooting
            • Best Practices
          • Manage Trusted Domains
          • Manage SSO provider
          • Troubleshoot SSO
      • Manage Workload Identities
        • Overview
        • Manage Workload Identities
        • Manage Service Accounts and API Keys
          • Overview
          • Create Service Accounts
          • Manage Service Accounts
          • Manage API Keys
            • Overview
            • Manage API keys
            • Best Practices
            • Troubleshoot
        • Manage OAuth/OIDC Identity Providers
          • Overview
          • Add an OIDC Identity Provider
          • Use OAuth Identity Pools and Filters
          • Manage Identity Provider Configurations
          • Manage the JWKS URI
          • Configure OAuth Clients
            • Overview
            • Go Clients
            • Java Clients
            • JavaScript Clients
            • .NET Clients
            • Python Clients
            • Configuration Reference
          • Access Kafka REST APIs
          • Use Confluent STS Tokens with REST APIs
          • Best Practices
          • Troubleshoot OAuth Issues
        • Manage mTLS Identity Providers
          • Overview
          • Configure mTLS
          • Manage Certificate Authorities
          • Manage Certificate Identity Pools
          • Create CEL Filters for mTLS
          • Create JSON payloads for mTLS
          • Manage Certificate Revocation
          • Troubleshoot mTLS Issues
    • Control Access
      • Overview
      • Resource Hierarchy
        • Overview
        • Organizations
          • Overview
          • Manage Multiple Organizations
        • Environments
        • Confluent Resource Names (CRNs)
      • Manage Role-Based Access Control
        • Overview
        • Predefined RBAC Roles
        • Manage Role Bindings
        • Use ACLs with RBAC
      • Manage IP Filtering
        • Overview
        • Manage IP Groups
        • Manage IP Filters
        • Best Practices
      • Manage Access Control Lists
        • Overview
        • Operations
        • Examples
        • Troubleshoot
        • Reference
      • Use the Confluent CLI with multiple credentials on Confluent Cloud
    • Encrypt and Protect Data
      • Overview
      • Manage Data in Transit With TLS
      • Encrypt Data at Rest Using Self-Managed Encryption Keys
        • Overview
        • Use Self-Managed Encryption Keys on AWS
        • Use Self-Managed Encryption Keys on Azure
        • Use Self-Managed Encryption Keys on Google Cloud
        • Use Pre-BYOK-API-V1 Self-Managed Encryption Keys
        • Use Confluent CLI for Self-Managed Encryption Keys
        • Use BYOK API for Self-Managed Encryption Keys
        • Revoke Access to Data at Rest
        • Best Practices
      • Encrypt Sensitive Data Using Client-Side Field Level Encryption
        • Overview
        • Manage CSFLE using Confluent Cloud Console
        • Use Client-side Field Level Encryption
        • Configuration Settings
        • Manage Encryption Keys
        • Quick Start
        • Implement a Custom KMS Driver
        • Process Encrypted Data with Apache Flink
        • Code examples
        • Troubleshoot
        • FAQ
    • Monitor Activity
      • Concepts
      • Audit Log Event Categories
      • Understand Audit Log Records
      • Audit Log Event Schema
      • Auditable Event Methods
        • Access Transparency
        • Connector
        • Custom Connector Plugin
        • Flink
        • Flink Authentication and Authorization
        • IP Filter Authorization
        • Kafka Cluster Authentication and Authorization
        • Kafka Cluster Management and Operations
        • ksqlDB Cluster Authentication and Authorization
        • Networking
        • Notifications Service
        • OAuth/OIDC Identity Provider and Identity Pool
        • Organization
        • Role-based Access Control (RBAC)
        • Schema Registry Authentication and Authorization
        • Schema Registry Management and Operations
        • Tableflow Data Plane
        • Tableflow Control Plane
      • Access and Consume Audit Log Records
      • Access Transparency
      • Retain Audit Logs
      • Best Practices
      • Troubleshoot
    • Access Management Tutorial
  • Manage Billing
    • Overview
    • Marketplace Consumption Metrics
    • Use AWS Pay As You Go
    • Use AWS Commits
    • Use Azure Pay As You Go
    • Use Azure Commits
    • Use Jio Commits
    • Use Professional Services on Azure
    • Use Google Cloud Pay As You Go
    • Use Google Cloud Commits
    • Use Professional Services on Google Cloud
    • Marketplace Organization Suspension and Deactivation
  • Manage Service Quotas
    • Overview
    • Service Quotas
    • View Service Quotas using Confluent CLI
    • Service Quotas API
  • APIs
    • Confluent Cloud APIs
    • Kafka Admin and Produce REST APIs
    • Connect API
    • Client APIs
      • C++ Client API
      • Python Client API
      • Go Client API
      • .NET Client API
    • Provider Integration API
    • Flink REST API
    • Metrics API
    • Stream Catalog REST API Usage
    • GraphQL API
    • Service Quotas API
  • Confluent CLI
  • Release Notes & FAQ
    • Release Notes
    • FAQ
    • Upgrade Policy
    • Compliance
    • Generate a HAR file for Troubleshooting
    • Confluent AI Assistant
  • Support
  • Glossary

Forecast Data Trends with Confluent Cloud for Apache Flink¶

Forecasting helps predict future values in time-series data by analyzing historical trends. In real-time streaming environments, continuous forecasting enables businesses to anticipate changes, optimize operations, and detect deviations as data flows in.

With Confluent Cloud for Apache Flink®, you can perform continuous forecasting and anomaly detection directly within your familiar SQL-based environment.

Confluent Cloud for Apache Flink provides the ML_FORECAST function, which enables you to perform real-time analysis and gain actionable insights from your streaming data without needing in-depth data science expertise. The ML_FORECAST function uses ARIMA (Autoregressive Integrated Moving Average), optimized for real-time performance, to deliver accurate forecasts in a streaming context.

Your data must include:

  • A timestamp column.
  • A target column representing some quantity of interest at each timestamp.

ARIMA model¶

ARIMA, which stands for Autoregressive Integrated Moving Average, is a powerful statistical technique used for time series analysis and forecasting. It combines three key components to model and predict future values based on past data:

  • Autoregressive (AR): This component uses past values of the time series to predict future values. It assumes that current observations are influenced by previous observations.
  • Integrated (I): This refers to the differencing of raw observations to make the time series stationary. Stationary data has consistent statistical properties over time, which is crucial for accurate forecasting.
  • Moving Average (MA): This component incorporates the relationship between an observation and a residual error from a moving average model applied to previous observations.

ARIMA models are used widely in various fields, including finance, economics, and weather forecasting, to analyze time-dependent data and make predictions. They are particularly effective when dealing with non-seasonal time series data that exhibits trends or patterns over time.

The model is typically represented as ARIMA(p,q,d), where:

  • p: The number of autoregressive terms
  • q: The order of the moving average
  • d: The degree of differencing

By adjusting these parameters, analysts can fine-tune the model to best fit their specific time series data and improve forecasting accuracy.

Be aware that forecasting accuracy can vary greatly, based on many parameters, and there is no guarantee of correctness in predictions made by using ARIMA and machine learning.

Forecasting parameters¶

The following parameters are supported for forecasting.

Model configuration¶

Use the following parameters to configure the ML_FORECAST function.

enableStl¶

  • Default: FALSE

Enable STL (Seasonal-Trend decomposition using Loess smoothing) decomposition. If set to TRUE, the time series is decomposed into trend, seasonal, and remainder components.

STL is a statistical method that separates a time series into three additive components: trend, seasonal, and remainder.

  • The trend component captures the long-term direction or movement in the data.
  • The seasonal component captures regular, predictable patterns that repeat over fixed periods.
  • The remainder component captures random fluctuations that aren’t explained by trend or seasonality.

This decomposition helps improve forecasting accuracy by enabling the ARIMA model to better analyze and predict the underlying patterns in your data.

evalWindowSize¶

  • Default: 5
  • Validation: Must be ≥ 1

Keeps the last evalWindowSize squared errors for both RMSE and AIC calculations. RMSE uses the rolling sum of these errors, while AIC uses the count (n) and sum of those same errors. A smaller window makes both metrics more responsive to recent errors; a larger window smooths out short-term noise.

horizon¶

  • Default: 1

Number of future time periods to forecast. This value defines how many steps ahead the model predicts.

m¶

  • Default: If enableStl is TRUE, seasonality is estimated using ACF.
  • Validation: m >= 0 and m < (minTrainingSize / 2)

The seasonal period length. If enableStl is TRUE, the model uses an ACF (Autocorrelation Function) to estimate the seasonal period length. The ACF measures the correlation between a time series and its lagged versions at different time intervals. This helps to identify repeating patterns and seasonal cycles in the data by detecting how strongly current values relate to past values at various lags.

maxTrainingSize¶

  • Default: 512
  • Validation: maxTrainingSize must be greater than minTrainingSize and less than 10000

Maximum number of historical data points used for training. This value limits the training window to recent data.

minTrainingSize¶

  • Default: 128
  • Validation: minTrainingSize must be less than maxTrainingSize and greater than 1

Minimum number of data points required for training. The model won’t train if the quantity of available data is less than this value.

p¶

  • Default: NULL
  • Validation: If provided, must be >= 0

Order of the AR (Autoregressive) term in the ARIMA model, which defines the number of lagged values used in the model. This value represents the number of past values that directly influence the current prediction.

If p is not provided, the model uses auto-ARIMA to determine the best order.

q¶

  • Default: NULL
  • Validation: If provided, must be >= 0

Order of the MA (Moving Average) term in the ARIMA model, which defines the number of lagged error terms in the model. This value represents the number of past prediction errors that influence the current prediction.

If q is not provided, the model uses auto-ARIMA to determine the best order.

d¶

  • Default: NULL
  • Validation: If provided, must be >= 0

Order of differencing in the ARIMA model, which defines the number of times the data is differenced to achieve stationarity. A value of 0 means no differencing, 1 means first difference, and so on.

If d is not provided, the model uses auto-ARIMA to determine the best order.

updateInterval¶

  • Default: 5

Frequency of model updates. This value defines how often the model retrains with new data.

Forecast results¶

The following fields are returned by the ML_FORECAST function.

aic¶

  • Type: DOUBLE

The AIC (Akaike Information Criterion) at the time the forecast was generated, calculated over the last evalWindowSize errors.

AIC is a statistical measure used to evaluate and compare the quality of different models. It balances model accuracy against complexity by penalizing models with more parameters. Lower AIC values indicate better model performance, which helps to identify the optimal model configuration that provides the best fit to the data without overfitting.

forecast_value¶

  • Type: DOUBLE

The forecast value at the timestamp.

lower_bound¶

  • Type: DOUBLE

The lower confidence interval for the forecasted value.

rmse¶

  • Type: DOUBLE

The RMSE (Root Mean Square Error) at the time the forecast was generated, calculated over the last evalWindowSize errors.

RMSE measures the difference between the predicted and actual values in a time series by calculating the square root of the average of the squared differences between the predicted and actual values. A lower RMSE indicates better model performance, because it means the model’s predictions are closer to the actual values.

timestamp¶

  • Type: TIMESTAMP

The LocalDateTime for this forecast.

upper_bound¶

  • Type: DOUBLE

The upper confidence interval for the forecasted value.

Pricing¶

Forecasting is billed in CFUs, as part of your compute pool usage.

Example¶

  1. Create a test table that has a timestamp field for your time series data.

    CREATE TABLE orders (
        total_orderunits double,
        summed_ts TIMESTAMP(3) NOT NULL,
        WATERMARK FOR summed_ts AS summed_ts
    );
    
  2. Insert mock time series data into the table.

    INSERT INTO orders (total_orderunits, summed_ts) VALUES
        (102.12, TIMESTAMP '2024-11-19 10:00:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:01:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:02:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:03:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:04:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:05:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:06:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:07:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:08:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:09:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:10:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:11:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:12:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:13:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:14:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:15:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:16:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:17:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:18:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:19:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:20:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:21:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:22:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:23:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:24:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:25:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:26:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:27:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:28:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:29:00.000');
    
  3. Run the following statement to perform a forecast on the mock data in the orders table.

    SELECT
        ML_FORECAST(total_orderunits, summed_ts, JSON_OBJECT('p' VALUE 1, 'q' VALUE 1, 'd' VALUE 1, 'minTrainingSize' VALUE 10, 'enableStl' VALUE false, 'horizon' VALUE 5))
        OVER (
            ORDER BY summed_ts
            RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS forecast
    FROM orders;
    
  4. Run the following statement to perform a forecast with AutoArima on the mock data in the orders table.

    SELECT
        ML_FORECAST(total_orderunits, summed_ts, JSON_OBJECT('minTrainingSize' VALUE 10, 'enableStl' VALUE false))
        OVER (
            ORDER BY summed_ts
            RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS forecast
    FROM orders;
    

Related content¶

  • AI Model Inference and Machine Learning Functions
  • Detect Anomalies
  • Run a Remote AI Model
  • CREATE MODEL statement
  • Create an AI model

Note

This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.

Was this doc page helpful?

Give us feedback

Do you still need help?

Confluent support portal Ask the community
Thank you. We'll be in touch!
Be the first to get updates and new content

By clicking "SIGN UP" you agree that your personal data will be processed in accordance with our Privacy Policy.

  • Confluent
  • About
  • Careers
  • Contact
  • Professional Services
  • Product
  • Confluent Cloud
  • Confluent Platform
  • Connectors
  • Flink
  • Stream Governance
  • Developer
  • Free Courses
  • Tutorials
  • Event Streaming Patterns
  • Documentation
  • Blog
  • Podcast
  • Community
  • Forum
  • Meetups
  • Kafka Summit
  • Catalysts
Terms & Conditions Privacy Policy Do Not Sell My Information Modern Slavery Policy Cookie Settings Feedback

Copyright © Confluent, Inc. 2014- Apache®️, Apache Kafka®️, Kafka®️, Apache Flink®️, Flink®️, Apache Iceberg®️, Iceberg®️ and associated open source project names are trademarks of the Apache Software Foundation

On this page: