Forecast Data Trends with Confluent Cloud for Apache Flink
Forecasting helps predict future values in time-series data by analyzing historical trends. In real-time streaming environments, continuous forecasting enables businesses to anticipate changes, optimize operations, and detect deviations as data flows in.
With Confluent Cloud for Apache Flink®, you can perform continuous forecasting directly within your familiar SQL-based environment.
Confluent Cloud for Apache Flink integrates forecasting capabilities on your streams by using these functions:
AI_FORECAST (Early Access Program): Forecast future values in time-series data using foundation models like TimesFM.
ML_FORECAST: Forecast trends in your data using a forecasting model based on Autoregressive Integrated Moving Average (ARIMA).
Your data must include:
A timestamp column.
A target column representing some quantity of interest at each timestamp.
Pricing
Forecasting is billed in CFUs, as part of your compute pool usage.
AI_FORECAST
The AI_FORECAST function analyzes a stream of time-series data and predicts future values with confidence intervals. The function uses a foundation model that is managed by Confluent and hosted in Confluent Cloud to learn patterns from historical data and forecast what comes next.
AI_FORECAST uses TimesFM 2.5 by default, a foundation model from Google Research designed for time-series forecasting.
During the Early Access Program, you can request additional time-series models (such as Chronos from Hugging Face) to be made available. To request additional models, contact your Confluent representative.
Important
The AI_FORECAST function is an Early Access Program feature in Confluent Cloud.
An Early Access feature is a component of Confluent Cloud introduced to gain feedback. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions.
Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. Early Access Program features are provided: (a) without support; (b) “AS IS”; and (c) without indemnification, warranty, or condition of any kind. No service level commitment will apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.
Note
AI_FORECAST uses models that are managed by Confluent and hosted in Confluent Cloud. You cannot use your own managed models or remote models from cloud providers with this function.
Parameters
The AI_FORECAST function accepts the following parameters.
config
Type: STRING
Required: No
A JSON object with configuration parameters. The configuration must be constant within a query.
timestamp
Type: TIMESTAMP(3)
Required: Yes
The event time associated with the value. The timestamp must not be null. Timestamps should be evenly spaced for best results.
value
Type: DOUBLE
Required: Yes
The numeric time-series value to forecast. The value must be finite (not NaN or Infinity). Null values are silently skipped.
Configuration parameters
The following configuration parameters can be passed in the config JSON object.
horizon
Type: INTEGER
Default: 5
Validation: Must be > 0
The number of future time steps to forecast.
maxContextSize
Type: INTEGER
Default: 512
Validation: Must be >= minContextSize
The maximum number of historical data points used for prediction.
minContextSize
Type: INTEGER
Default: 20
Validation: Must be >= 2
The minimum number of data points required before forecasting begins.
model
Type: STRING
Default:
timesfm-2.5Required: No
The model to use for forecasting.
rmseWindowSize
Type: INTEGER
Default: 5
Validation: Must be > 0
The rolling window size for tracking Root Mean Square Error (RMSE) of predictions.
Output
The AI_FORECAST function returns a ROW data type, which is a structured record containing the following fields.
forecast
Type: ARRAY<ROW<timestamp TIMESTAMP(3), mean DOUBLE, q10 DOUBLE, q50 DOUBLE, q90 DOUBLE>>
An array of forecasted data points, one per horizon step. Each entry contains the predicted timestamp, mean forecast, and three quantiles (10th, 50th, and 90th percentile). The array length equals the horizon parameter.
metadata
Type: STRING
A JSON string containing model metadata, including the model name, requested horizon, frequency, and input lengths.
Examples
The following examples demonstrate how to use the AI_FORECAST function for various forecasting scenarios.
Basic forecasting on server CPU usage
The following example forecasts CPU usage for the next 5 time steps.
CREATE TABLE cpu_metrics (
host_id STRING,
cpu_percent DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
);
SELECT
event_time,
cpu_percent,
AI_FORECAST(
cpu_percent,
event_time,
JSON_OBJECT(
'model' VALUE 'timesfm-2.5',
'minContextSize' VALUE 10,
'maxContextSize' VALUE 200,
'horizon' VALUE 5
)
) OVER (
ORDER BY event_time
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS forecast_result
FROM cpu_metrics;
Extracting individual forecast steps
The following example shows how to extract specific fields from individual forecast steps.
SELECT
event_time,
cpu_percent,
forecast_result.forecast[0].`timestamp` AS next_timestamp,
forecast_result.forecast[0].mean AS next_mean,
forecast_result.forecast[0].q10 AS next_q10,
forecast_result.forecast[0].q90 AS next_q90,
forecast_result.metadata AS model_metadata
FROM (
SELECT
event_time,
cpu_percent,
AI_FORECAST(
cpu_percent,
event_time,
JSON_OBJECT(
'model' VALUE 'timesfm-2.5',
'minContextSize' VALUE 10,
'maxContextSize' VALUE 200,
'horizon' VALUE 3
)
) OVER (
ORDER BY event_time
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS forecast_result
FROM cpu_metrics
);
ML_FORECAST
The ML_FORECAST function enables you to perform real-time analysis and gain actionable insights from your streaming data without needing in-depth data science expertise. The function uses ARIMA (Autoregressive Integrated Moving Average), optimized for real-time performance, to deliver accurate forecasts in a streaming context.
ARIMA model
ARIMA, which stands for Autoregressive Integrated Moving Average, is a powerful statistical technique used for time series analysis and forecasting. It combines three key components to model and predict future values based on past data:
Autoregressive (AR): This component uses past values of the time series to predict future values. It assumes that current observations are influenced by previous observations.
Integrated (I): This refers to the differencing of raw observations to make the time series stationary. Stationary data has consistent statistical properties over time, which is crucial for accurate forecasting.
Moving Average (MA): This component incorporates the relationship between an observation and a residual error from a moving average model applied to previous observations.
ARIMA models are used widely in various fields, including finance, economics, and weather forecasting, to analyze time-dependent data and make predictions. They are particularly effective when dealing with non-seasonal time series data that exhibits trends or patterns over time.
The model is typically represented as ARIMA(p,q,d), where:
p: The number of autoregressive terms
q: The order of the moving average
d: The degree of differencing
By adjusting these parameters, analysts can fine-tune the model to best fit their specific time series data and improve forecasting accuracy.
Be aware that forecasting accuracy can vary greatly, based on many parameters, and there is no guarantee of correctness in predictions made by using ARIMA and machine learning.
Forecasting parameters
The following parameters are supported for forecasting.
Model configuration
Use the following parameters to configure the ML_FORECAST function.
enableStl
Default: FALSE
Enable STL (Seasonal-Trend decomposition using Loess smoothing) decomposition. If set to TRUE, the time series is decomposed into trend, seasonal, and remainder components.
STL is a statistical method that separates a time series into three additive components: trend, seasonal, and remainder.
The trend component captures the long-term direction or movement in the data.
The seasonal component captures regular, predictable patterns that repeat over fixed periods.
The remainder component captures random fluctuations that aren’t explained by trend or seasonality.
This decomposition helps improve forecasting accuracy by enabling the ARIMA model to better analyze and predict the underlying patterns in your data.
evalWindowSize
Default: 5
Validation: Must be ≥ 1
Keeps the last evalWindowSize squared errors for both RMSE and AIC calculations. RMSE uses the rolling sum of these errors, while AIC uses the count (n) and sum of those same errors. A smaller window makes both metrics more responsive to recent errors; a larger window smooths out short-term noise.
horizon
Default: 5
Number of future time periods to forecast. This value defines how many steps ahead the model predicts.
m
Default: If
enableStlis TRUE, seasonality is estimated using ACF.Validation:
m>= 0 andm< (minTrainingSize/ 2)
The seasonal period length. If enableStl is TRUE, the model uses an ACF (Autocorrelation Function) to estimate the seasonal period length. The ACF measures the correlation between a time series and its lagged versions at different time intervals. This helps to identify repeating patterns and seasonal cycles in the data by detecting how strongly current values relate to past values at various lags.
maxTrainingSize
Default: 512
Validation:
maxTrainingSizemust be greater thanminTrainingSizeand less than 10000
Maximum number of historical data points used for training. This value limits the training window to recent data.
minTrainingSize
Default: 128
Validation:
minTrainingSizemust be less thanmaxTrainingSizeand greater than 1
Minimum number of data points required for training. The model won’t train if the quantity of available data is less than this value.
p
Default: NULL
Validation: If provided, must be >= 0
Order of the AR (Autoregressive) term in the ARIMA model, which defines the number of lagged values used in the model. This value represents the number of past values that directly influence the current prediction.
If p is not provided, the model uses auto-ARIMA to determine the best order.
q
Default: NULL
Validation: If provided, must be >= 0
Order of the MA (Moving Average) term in the ARIMA model, which defines the number of lagged error terms in the model. This value represents the number of past prediction errors that influence the current prediction.
If q is not provided, the model uses auto-ARIMA to determine the best order.
d
Default: NULL
Validation: If provided, must be >= 0
Order of differencing in the ARIMA model, which defines the number of times the data is differenced to achieve stationarity. A value of 0 means no differencing, 1 means first difference, and so on.
If d is not provided, the model uses auto-ARIMA to determine the best order.
updateInterval
Default: 5
Frequency of model updates. This value defines how often the model retrains with new data.
Forecast results
The following fields are returned by the ML_FORECAST function.
actual_value
Type: DOUBLE
The original metric value.
Scalar mode: the original metric value.
Vector mode: NULL, because the input is multidimensional.
aic
Type: DOUBLE
The AIC (Akaike Information Criterion) at the time the forecast was generated, calculated over the last evalWindowSize errors.
AIC is a statistical measure used to evaluate and compare the quality of different models. It balances model accuracy against complexity by penalizing models with more parameters. Lower AIC values indicate better model performance, which helps to identify the optimal model configuration that provides the best fit to the data without overfitting.
forecast_value
Type: DOUBLE
The forecast value at the timestamp.
lower_bound
Type: DOUBLE
The lower confidence interval for the forecasted value.
rmse
Type: DOUBLE
The RMSE (Root Mean Square Error) at the time the forecast was generated, calculated over the last evalWindowSize errors.
RMSE measures the difference between the predicted and actual values in a time series by calculating the square root of the average of the squared differences between the predicted and actual values. A lower RMSE indicates better model performance, because it means the model’s predictions are closer to the actual values.
timestamp
Type: TIMESTAMP
The LocalDateTime for this forecast.
upper_bound
Type: DOUBLE
The upper confidence interval for the forecasted value.
Example
Create a test table that has a timestamp field for your time series data.
CREATE TABLE orders ( total_orderunits double, summed_ts TIMESTAMP(3) NOT NULL, WATERMARK FOR summed_ts AS summed_ts );
Insert mock time series data into the table.
INSERT INTO orders (total_orderunits, summed_ts) VALUES (102.12, TIMESTAMP '2024-11-19 10:00:00.000'), (103.45, TIMESTAMP '2024-11-19 10:01:00.000'), (101.89, TIMESTAMP '2024-11-19 10:02:00.000'), (104.23, TIMESTAMP '2024-11-19 10:03:00.000'), (102.78, TIMESTAMP '2024-11-19 10:04:00.000'), (102.12, TIMESTAMP '2024-11-19 10:05:00.000'), (103.45, TIMESTAMP '2024-11-19 10:06:00.000'), (101.89, TIMESTAMP '2024-11-19 10:07:00.000'), (104.23, TIMESTAMP '2024-11-19 10:08:00.000'), (102.78, TIMESTAMP '2024-11-19 10:09:00.000'), (102.12, TIMESTAMP '2024-11-19 10:10:00.000'), (103.45, TIMESTAMP '2024-11-19 10:11:00.000'), (101.89, TIMESTAMP '2024-11-19 10:12:00.000'), (104.23, TIMESTAMP '2024-11-19 10:13:00.000'), (102.78, TIMESTAMP '2024-11-19 10:14:00.000'), (102.12, TIMESTAMP '2024-11-19 10:15:00.000'), (103.45, TIMESTAMP '2024-11-19 10:16:00.000'), (101.89, TIMESTAMP '2024-11-19 10:17:00.000'), (104.23, TIMESTAMP '2024-11-19 10:18:00.000'), (102.78, TIMESTAMP '2024-11-19 10:19:00.000'), (102.12, TIMESTAMP '2024-11-19 10:20:00.000'), (103.45, TIMESTAMP '2024-11-19 10:21:00.000'), (101.89, TIMESTAMP '2024-11-19 10:22:00.000'), (104.23, TIMESTAMP '2024-11-19 10:23:00.000'), (102.78, TIMESTAMP '2024-11-19 10:24:00.000'), (102.12, TIMESTAMP '2024-11-19 10:25:00.000'), (103.45, TIMESTAMP '2024-11-19 10:26:00.000'), (101.89, TIMESTAMP '2024-11-19 10:27:00.000'), (104.23, TIMESTAMP '2024-11-19 10:28:00.000'), (102.78, TIMESTAMP '2024-11-19 10:29:00.000');
Run the following statement to perform a forecast on the mock data in the
orderstable.SELECT ML_FORECAST(total_orderunits, summed_ts, JSON_OBJECT('p' VALUE 1, 'q' VALUE 1, 'd' VALUE 1, 'minTrainingSize' VALUE 10, 'enableStl' VALUE false, 'horizon' VALUE 5)) OVER ( ORDER BY summed_ts RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS forecast FROM orders;
Run the following statement to perform a forecast with AutoArima on the mock data in the
orderstable.SELECT ML_FORECAST(total_orderunits, summed_ts, JSON_OBJECT('minTrainingSize' VALUE 10, 'enableStl' VALUE false)) OVER ( ORDER BY summed_ts RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS forecast FROM orders;