documentation
Get Started Free
  • Get Started Free
  • Stream
      Confluent Cloud

      Fully-managed data streaming platform with a cloud-native Kafka engine (KORA) for elastic scaling, with enterprise security, stream processing, governance.

      Confluent Platform

      An on-premises enterprise-grade distribution of Apache Kafka with enterprise security, stream processing, governance.

  • Connect
      Managed

      Use fully-managed connectors with Confluent Cloud to connect to data sources and sinks.

      Self-Managed

      Use self-managed connectors with Confluent Platform to connect to data sources and sinks.

  • Govern
      Managed

      Use fully-managed Schema Registry and Stream Governance with Confluent Cloud.

      Self-Managed

      Use self-managed Schema Registry and Stream Governance with Confluent Platform.

  • Process
      Managed

      Use Flink on Confluent Cloud to run complex, stateful, low-latency streaming applications.

      Self-Managed

      Use Flink on Confluent Platform to run complex, stateful, low-latency streaming applications.

Stream
Confluent Cloud

Fully-managed data streaming platform with a cloud-native Kafka engine (KORA) for elastic scaling, with enterprise security, stream processing, governance.

Confluent Platform

An on-premises enterprise-grade distribution of Apache Kafka with enterprise security, stream processing, governance.

Connect
Managed

Use fully-managed connectors with Confluent Cloud to connect to data sources and sinks.

Self-Managed

Use self-managed connectors with Confluent Platform to connect to data sources and sinks.

Govern
Managed

Use fully-managed Schema Registry and Stream Governance with Confluent Cloud.

Self-Managed

Use self-managed Schema Registry and Stream Governance with Confluent Platform.

Process
Managed

Use Flink on Confluent Cloud to run complex, stateful, low-latency streaming applications.

Self-Managed

Use Flink on Confluent Platform to run complex, stateful, low-latency streaming applications.

Learn
Get Started Free
  1. Home
  2. Flink Jobs
  3. Deploy and Manage Statements in Confluent Manager for Apache Flink
  • Overview
  • Installation and Upgrade
    • Overview
    • Versions and Interoperability
    • Install with Helm
    • Configure Authentication
    • Configure Authorization
    • Configure Storage
    • Configure Encryption
    • Upgrade
  • Get Started
    • Overview
    • Get Started with Applications
    • Get Started with Statements
  • Architecture and Features
    • Overview
    • Understand Flink
    • Confluent Manager for Apache Flink
  • Configure Environments, Catalogs and Compute Pools
    • Overview
    • Manage Environments
    • Manage Catalogs and Databases
    • Manage Compute Pools
    • Configure Access Control
  • Deploy and Manage Flink Jobs
    • Overview
    • Applications
      • Overview
      • Create Applications
      • Manage Applications
      • Application Instances
      • Events
      • Package Flink Jobs
      • Package PyFlink Jobs
      • Run Flink Agents
      • Supported Features
    • SQL Statements
      • Overview
      • Create Statements
      • Manage Statements
      • Use Interactive Shell
      • Forecast
      • Anomaly Detection
      • Features and Support
    • Manage Savepoints
    • Job Configuration
      • Overview
      • Checkpointing
      • Logging
      • Metrics
      • Security
  • Manage Flink Applications with CFK
  • Manage Flink Applications in Confluent Control Center
  • Disaster Recovery
  • Clients and APIs
    • Overview
    • Use REST APIs
    • Use CLI Operations
    • Use Confluent for Kubernetes
    • Use Control Center with Confluent Manager for Apache Flink
  • How-to Guides
    • Overview
    • Checkpoint to S3
  • FAQ
  • Get Help
  • What’s New

Forecast Data Trends with Confluent Platform for Apache Flink

Forecasting is an important use-case for Apache Flink®. You can deploy forecasting jobs using Confluent Manager for Apache Flink (CMF), a central management component of Confluent Platform for Apache Flink that enables users to securely manage a fleet of Flink jobs across multiple environments.

How does forecasting work?

Forecasting helps predict future values in time-series data by analyzing historical trends. In real-time streaming environments, continuous forecasting enables businesses to anticipate changes, optimize operations, and detect deviations as data flows in.

With Apache Flink®, you can perform continuous forecasting and anomaly detection directly within your familiar SQL-based environment.

Flink provides the Flink SQL forecast function, which enables you to perform real-time analysis and gain actionable insights from your streaming data without needing in-depth data science expertise. The ML_FORECAST function uses ARIMA (Autoregressive Integrated Moving Average), optimized for real-time performance, to deliver accurate forecasts in a streaming context.

Your data must include:

  • A timestamp column.

  • A target column representing some quantity of interest at each timestamp.

ARIMA model

ARIMA, which stands for Autoregressive Integrated Moving Average, is a powerful statistical technique used for time series analysis and forecasting. It combines three key components to model and predict future values based on past data:

  • Autoregressive (AR): This component uses past values of the time series to predict future values. It assumes that current observations are influenced by previous observations.

  • Integrated (I): This refers to the differencing of raw observations to make the time series stationary. Stationary data has consistent statistical properties over time, which is crucial for accurate forecasting.

  • Moving Average (MA): This component incorporates the relationship between an observation and a residual error from a moving average model applied to previous observations.

ARIMA models are used widely in various fields, including finance, economics, and weather forecasting, to analyze time-dependent data and make predictions. They are particularly effective when dealing with non-seasonal time series data that exhibits trends or patterns over time.

The model is typically represented as ARIMA(p,q,d), where:

  • p: The number of autoregressive terms

  • q: The order of the moving average

  • d: The degree of differencing

By adjusting these parameters, analysts can fine-tune the model to best fit their specific time series data and improve forecasting accuracy.

Be aware that forecasting accuracy can vary greatly, based on many parameters, and there is no guarantee of correctness in predictions made by using ARIMA and machine learning.

Forecasting parameters

The following parameters are supported for forecasting.

Model configuration

Use the following parameters to configure the ML_FORECAST function.

enableStl

  • Default: FALSE

Enable STL (Seasonal-Trend decomposition using Loess smoothing) decomposition. If set to TRUE, the time series is decomposed into trend, seasonal, and remainder components.

STL is a statistical method that separates a time series into three additive components: trend, seasonal, and remainder.

  • The trend component captures the long-term direction or movement in the data.

  • The seasonal component captures regular, predictable patterns that repeat over fixed periods.

  • The remainder component captures random fluctuations that aren’t explained by trend or seasonality.

This decomposition helps improve forecasting accuracy by enabling the ARIMA model to better analyze and predict the underlying patterns in your data.

evalWindowSize

  • Default: 5

  • Validation: Must be ≥ 1

Keeps the last evalWindowSize squared errors for both RMSE and AIC calculations. RMSE uses the rolling sum of these errors, while AIC uses the count (n) and sum of those same errors. A smaller window makes both metrics more responsive to recent errors; a larger window smooths out short-term noise.

horizon

  • Default: 1

Number of future time periods to forecast. This value defines how many steps ahead the model predicts.

m

  • Default: If enableStl is TRUE, seasonality is estimated using ACF.

  • Validation: m >= 0 and m < (minTrainingSize / 2)

The seasonal period length. If enableStl is TRUE, the model uses an ACF (Autocorrelation Function) to estimate the seasonal period length. The ACF measures the correlation between a time series and its lagged versions at different time intervals. This helps to identify repeating patterns and seasonal cycles in the data by detecting how strongly current values relate to past values at various lags.

maxTrainingSize

  • Default: 512

  • Validation: maxTrainingSize must be greater than minTrainingSize and less than 10000

Maximum number of historical data points used for training. This value limits the training window to recent data.

minTrainingSize

  • Default: 128

  • Validation: minTrainingSize must be less than maxTrainingSize and greater than 1

Minimum number of data points required for training. The model won’t train if the quantity of available data is less than this value.

p

  • Default: NULL

  • Validation: If provided, must be >= 0

Order of the AR (Autoregressive) term in the ARIMA model, which defines the number of lagged values used in the model. This value represents the number of past values that directly influence the current prediction.

If p is not provided, the model uses auto-ARIMA to determine the best order.

q

  • Default: NULL

  • Validation: If provided, must be >= 0

Order of the MA (Moving Average) term in the ARIMA model, which defines the number of lagged error terms in the model. This value represents the number of past prediction errors that influence the current prediction.

If q is not provided, the model uses auto-ARIMA to determine the best order.

d

  • Default: NULL

  • Validation: If provided, must be >= 0

Order of differencing in the ARIMA model, which defines the number of times the data is differenced to achieve stationarity. A value of 0 means no differencing, 1 means first difference, and so on.

If d is not provided, the model uses auto-ARIMA to determine the best order.

updateInterval

  • Default: 5

Frequency of model updates. This value defines how often the model retrains with new data.

Forecast results

The following fields are returned by the ML_FORECAST function.

aic

  • Type: DOUBLE

The AIC (Akaike Information Criterion) at the time the forecast was generated, calculated over the last evalWindowSize errors.

AIC is a statistical measure used to evaluate and compare the quality of different models. It balances model accuracy against complexity by penalizing models with more parameters. Lower AIC values indicate better model performance, which helps to identify the optimal model configuration that provides the best fit to the data without overfitting.

forecast_value

  • Type: DOUBLE

The forecast value at the timestamp.

lower_bound

  • Type: DOUBLE

The lower confidence interval for the forecasted value.

rmse

  • Type: DOUBLE

The RMSE (Root Mean Square Error) at the time the forecast was generated, calculated over the last evalWindowSize errors.

RMSE measures the difference between the predicted and actual values in a time series by calculating the square root of the average of the squared differences between the predicted and actual values. A lower RMSE indicates better model performance, because it means the model’s predictions are closer to the actual values.

timestamp

  • Type: TIMESTAMP

The LocalDateTime for this forecast.

upper_bound

  • Type: DOUBLE

The upper confidence interval for the forecasted value.

Example

  1. Create a test table that has a timestamp field for your time series data.

    CREATE TABLE orders (
        total_orderunits double,
        summed_ts TIMESTAMP(3) NOT NULL,
        WATERMARK FOR summed_ts AS summed_ts
    );
    
  2. Insert mock time series data into the table.

    INSERT INTO orders (total_orderunits, summed_ts) VALUES
    (102.12, TIMESTAMP '2024-11-19 10:00:00.000'),
    (103.45, TIMESTAMP '2024-11-19 10:01:00.000'),
    (101.89, TIMESTAMP '2024-11-19 10:02:00.000'),
    (104.23, TIMESTAMP '2024-11-19 10:03:00.000'),
    (102.78, TIMESTAMP '2024-11-19 10:04:00.000'),
    (102.12, TIMESTAMP '2024-11-19 10:05:00.000'),
    (103.45, TIMESTAMP '2024-11-19 10:06:00.000'),
    (101.89, TIMESTAMP '2024-11-19 10:07:00.000'),
    (104.23, TIMESTAMP '2024-11-19 10:08:00.000'),
    (102.78, TIMESTAMP '2024-11-19 10:09:00.000'),
    (102.12, TIMESTAMP '2024-11-19 10:10:00.000'),
    (103.45, TIMESTAMP '2024-11-19 10:11:00.000'),
    (101.89, TIMESTAMP '2024-11-19 10:12:00.000'),
    (104.23, TIMESTAMP '2024-11-19 10:13:00.000'),
    (102.78, TIMESTAMP '2024-11-19 10:14:00.000'),
    (102.12, TIMESTAMP '2024-11-19 10:15:00.000'),
    (103.45, TIMESTAMP '2024-11-19 10:16:00.000'),
    (101.89, TIMESTAMP '2024-11-19 10:17:00.000'),
    (104.23, TIMESTAMP '2024-11-19 10:18:00.000'),
    (102.78, TIMESTAMP '2024-11-19 10:19:00.000'),
    (102.12, TIMESTAMP '2024-11-19 10:20:00.000'),
    (103.45, TIMESTAMP '2024-11-19 10:21:00.000'),
    (101.89, TIMESTAMP '2024-11-19 10:22:00.000'),
    (104.23, TIMESTAMP '2024-11-19 10:23:00.000'),
    (102.78, TIMESTAMP '2024-11-19 10:24:00.000'),
    (102.12, TIMESTAMP '2024-11-19 10:25:00.000'),
    (103.45, TIMESTAMP '2024-11-19 10:26:00.000'),
    (101.89, TIMESTAMP '2024-11-19 10:27:00.000'),
    (104.23, TIMESTAMP '2024-11-19 10:28:00.000'),
    (102.78, TIMESTAMP '2024-11-19 10:29:00.000');
    
  1. Run the following statement to perform a forecast on the mock data in the orders table.

    SELECT
        ML_FORECAST(total_orderunits, summed_ts, JSON_OBJECT('p' VALUE 1, 'q' VALUE 1, 'd' VALUE 1, 'minTrainingSize' VALUE 10, 'enableStl' VALUE false, 'horizon' VALUE 5))
        OVER (
            ORDER BY summed_ts
            RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS forecast
    FROM orders;
    
  2. Run the following statement to perform a forecast with AutoArima on the mock data in the orders table.

    SELECT
        ML_FORECAST(total_orderunits, summed_ts, JSON_OBJECT('minTrainingSize' VALUE 10, 'enableStl' VALUE false))
        OVER (
            ORDER BY summed_ts
            RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS forecast
    FROM orders;
    

Related content

  • Anomaly Detection with Confluent Manager for Apache Flink

  • Submit Flink SQL Statement with Confluent Manager for Apache Flink

  • Deploy and Manage Statements in Confluent Manager for Apache Flink

Note

This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.

Was this doc page helpful?

Give us feedback

Do you still need help?

Confluent support portalAsk the community
Thank you. We'll be in touch!
Be the first to get updates and new content

By clicking "SIGN UP" you agree that your personal data will be processed in accordance with our Privacy Policy.

  • Confluent
  • About
  • Careers
  • Contact
  • Professional Services
  • Product
  • Confluent Cloud
  • Confluent Platform
  • Connectors
  • Flink
  • Stream Governance
  • Developer
  • Free Courses
  • Tutorials
  • Event Streaming Patterns
  • Documentation
  • Blog
  • Podcast
  • Community
  • Forum
  • Meetups
  • Kafka Summit
  • Catalysts
Terms & ConditionsPrivacy PolicyDo Not Sell My InformationModern Slavery PolicyCookie SettingsFeedback

Copyright © Confluent, Inc. 2014- Apache®️, Apache Kafka®️, Kafka®️, Apache Flink®️, Flink®️, Apache Iceberg®️, Iceberg®️ and associated open source project names are trademarks of the Apache Software Foundation

On this page:
  • How does forecasting work?
  • ARIMA model
  • Forecasting parameters
  • Model configuration
  • enableStl
  • evalWindowSize
  • horizon
  • m
  • maxTrainingSize
  • minTrainingSize
  • p
  • q
  • d
  • updateInterval
  • Forecast results
  • aic
  • forecast_value
  • lower_bound
  • rmse
  • timestamp
  • upper_bound
  • Example
  • Related content