Detect Anomalies in Data with Confluent Cloud for Apache Flink

Anomaly detection uses statistical and machine learning techniques to identify data points, events, or observations that deviate significantly from the expected pattern within a stream. These deviations, or anomalies, can indicate critical incidents, like system failures, fraudulent activities, or unusual behavioral patterns that require further investigation.

Anomaly detection identifies unexpected deviations in real-time, helping improve data quality, detect operational issues instantly, and enable faster decision-making. Instead of waiting for batch processing, Confluent’s continuous anomaly detection monitors data as it arrives, detecting outliers the moment they occur.

Confluent Cloud for Apache Flink® integrates anomaly detection capabilities on your streams by using these functions:

  • AI_DETECT_ANOMALIES (Early Access Program): Detect anomalies in time-series data using model-based inference with foundation models like TimesFM.

  • ML_DETECT_ANOMALIES: Detect anomalies in your data using a forecasting model based on Autoregressive Integrated Moving Average (ARIMA).

  • ML_DETECT_ANOMALIES_ROBUST (Early Access Program): Detect univariate and multivariate anomalies in your data by using Median Absolute Deviation (MAD).

Pricing

Anomaly detection is billed in CFUs, as part of your compute pool usage.

AI_DETECT_ANOMALIES

The AI_DETECT_ANOMALIES function monitors a stream of time-series data and flags data points that fall outside expected bounds. The function uses a foundation model that is managed by Confluent and hosted in Confluent Cloud to learn patterns from historical data and produce prediction intervals. When an actual value falls outside the predicted lower bound and upper bound range, the function flags it as an anomaly.

AI_DETECT_ANOMALIES uses TimesFM 2.5 by default, a foundation model from Google Research designed for time-series forecasting.

During the Early Access Program, you can request additional time-series models (such as Chronos from Hugging Face) to be made available. To request additional models, contact your Confluent representative.

Your data must include:

  • A timestamp column for ordering the time series.

  • A numeric value column representing the time-series data to monitor.

Note

The AI_DETECT_ANOMALIES function is an Early Access Program feature in Confluent Cloud.

An Early Access feature is a component of Confluent Cloud introduced to gain feedback. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions.

Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. Early Access Program features are provided: (a) without support; (b) “AS IS”; and (c) without indemnification, warranty, or condition of any kind. No service level commitment will apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.

Note

AI_DETECT_ANOMALIES uses models that are managed by Confluent and hosted in Confluent Cloud. You cannot use your own managed models or remote models from cloud providers with this function.

Parameters

The AI_DETECT_ANOMALIES function accepts the following parameters.

config

  • Type: STRING

  • Required: No

A JSON object with configuration parameters. The config parameter must be constant within a query. See the configuration parameters below for available options.

timestamp

  • Type: TIMESTAMP(3)

  • Required: Yes

The event time associated with the value. The timestamp parameter must not be null. Timestamps should be evenly spaced for best results.

value

  • Type: DOUBLE

  • Required: Yes

The numeric time-series value to monitor. The value parameter must be finite and not NaN or Infinity. Null values are silently skipped.

Configuration parameters

The following configuration parameters are passed as a JSON object in the config parameter.

confidencePercentage
  • Type: DOUBLE

  • Default: 99.0

  • Validation: 0 < value < 100

The symmetric confidence level for the prediction interval. For example, an 80% confidence means values outside the 10th–90th percentile range the function flags as anomalies.

lowerBoundConfidencePercentage
  • Type: DOUBLE

  • Default: null

  • Validation: 0 < value < 100

The asymmetric lower bound confidence. When set, this overrides the lower side of the confidencePercentage parameter.

maxContextSize
  • Type: INTEGER

  • Default: 512

  • Validation: Must be >= minContextSize

The maximum number of historical points used for prediction. When exceeded, the oldest points are dropped.

minContextSize
  • Type: INTEGER

  • Default: 20

  • Validation: Must be >= 2

The minimum number of data points before anomaly detection begins. Until this threshold is reached, output fields are null.

model
  • Type: STRING

  • Default: timesfm-2.5

  • Required: No

The model to use for anomaly detection.

rmseWindowSize
  • Type: INTEGER

  • Default: 5

  • Validation: Must be > 0

The rolling window size for tracking Root Mean Square Error (RMSE) of predictions.

upperBoundConfidencePercentage
  • Type: DOUBLE

  • Default: null

  • Validation: 0 < value < 100

The asymmetric upper bound confidence. When set, this overrides the upper side of the confidencePercentage parameter.

Output

The AI_DETECT_ANOMALIES function returns a ROW data type, which is a structured record containing the following fields.

aic

  • Type: DOUBLE

Reserved for future use. Always null.

actual_value

  • Type: DOUBLE

The observed value at this timestamp.

forecast_value

  • Type: DOUBLE

The model’s predicted value. Null until enough context is accumulated.

is_anomaly

  • Type: BOOLEAN

TRUE if the actual_value field is less than the lower_bound field or greater than the upper_bound field; otherwise FALSE.

lower_bound

  • Type: DOUBLE

The lower bound of the prediction interval.

rmse

  • Type: DOUBLE

The rolling RMSE of prediction errors.

timestamp

  • Type: TIMESTAMP(3)

The timestamp of the evaluated data point.

upper_bound

  • Type: DOUBLE

The upper bound of the prediction interval.

Examples

The following examples demonstrate how to use the AI_DETECT_ANOMALIES function for various anomaly detection scenarios.

Basic anomaly detection on server metrics

The following example demonstrates basic anomaly detection on server response time metrics with an 80% confidence interval.

  1. Create a table for server metrics.

    CREATE TABLE server_metrics (
        host_id STRING,
        response_ms DOUBLE,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    );
    
  2. Run anomaly detection with AI_DETECT_ANOMALIES.

    SELECT
        event_time,
        response_ms,
        AI_DETECT_ANOMALIES(
            response_ms,
            event_time,
            JSON_OBJECT(
                'model' VALUE 'timesfm-2.5',
                'minContextSize' VALUE 10,
                'maxContextSize' VALUE 200,
                'confidencePercentage' VALUE 80.0
            )
        ) OVER (
            ORDER BY event_time
            RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS anomaly_result
    FROM server_metrics;
    

Filtering to only anomalous rows

The following example filters the results to show only rows where anomalies are detected.

SELECT *
FROM (
    SELECT
        event_time,
        response_ms,
        AI_DETECT_ANOMALIES(
            response_ms,
            event_time,
            JSON_OBJECT(
                'model' VALUE 'timesfm-2.5',
                'minContextSize' VALUE 10,
                'maxContextSize' VALUE 200,
                'confidencePercentage' VALUE 90.0
            )
        ) OVER (
            ORDER BY event_time
            RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS anomaly
    FROM server_metrics
)
WHERE anomaly.is_anomaly = true;

Asymmetric bounds (flag only high spikes)

The following example uses the lowerBoundConfidencePercentage and upperBoundConfidencePercentage parameters to set different thresholds for high and low anomalies. This configuration makes the function more sensitive to high values while being more tolerant of low values.

SELECT
    event_time,
    response_ms,
    AI_DETECT_ANOMALIES(
        response_ms,
        event_time,
        JSON_OBJECT(
            'model' VALUE 'timesfm-2.5',
            'minContextSize' VALUE 10,
            'maxContextSize' VALUE 200,
            'lowerBoundConfidencePercentage' VALUE 99.9,
            'upperBoundConfidencePercentage' VALUE 80.0
        )
    ) OVER (
        ORDER BY event_time
        RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS anomaly
FROM server_metrics;

ML_DETECT_ANOMALIES

Built on forecasting, the ML_DETECT_ANOMALIES function leverages a popular machine learning algorithm, named Autoregressive Integrated Moving Average (ARIMA), that is optimized for real-time performance, enabling accurate and reliable anomaly detection.

Your data must include:

  • A timestamp column.

  • A target column representing some quantity of interest at each timestamp.

ARIMA model

ARIMA, which stands for Autoregressive Integrated Moving Average, is a powerful statistical technique used for time series analysis and forecasting. It combines three key components to model and predict future values based on past data:

  • Autoregressive (AR): This component uses past values of the time series to predict future values. It assumes that current observations are influenced by previous observations.

  • Integrated (I): This refers to the differencing of raw observations to make the time series stationary. Stationary data has consistent statistical properties over time, which is crucial for accurate forecasting.

  • Moving Average (MA): This component incorporates the relationship between an observation and a residual error from a moving average model applied to previous observations.

ARIMA models are used widely in various fields, including finance, economics, and weather forecasting, to analyze time-dependent data and make predictions. They are particularly effective when dealing with non-seasonal time series data that exhibits trends or patterns over time.

The model is typically represented as ARIMA(p,q,d), where:

  • p: The number of autoregressive terms

  • q: The order of the moving average

  • d: The degree of differencing

By adjusting these parameters, analysts can fine-tune the model to best fit their specific time series data and improve forecasting accuracy.

Be aware that forecasting accuracy can vary greatly, based on many parameters, and there is no guarantee of correctness in predictions made by using ARIMA and machine learning.

Anomaly detection parameters

The following parameters are supported for anomaly detection.

Model configuration

Use the following parameters to configure the ML_DETECT_ANOMALIES function.

enableStl

  • Default: FALSE

Enable STL (Seasonal-Trend decomposition using Loess smoothing) decomposition. If set to TRUE, the time series is decomposed into trend, seasonal, and remainder components.

STL is a statistical method that separates a time series into three additive components: trend, seasonal, and remainder.

  • The trend component captures the long-term direction or movement in the data.

  • The seasonal component captures regular, predictable patterns that repeat over fixed periods.

  • The remainder component captures random fluctuations that aren’t explained by trend or seasonality.

This decomposition helps improve forecasting accuracy by enabling the ARIMA model to better analyze and predict the underlying patterns in your data.

evalWindowSize

  • Default: 5

  • Validation: Must be ≥ 1

Keeps the last evalWindowSize squared errors for both RMSE and AIC calculations. RMSE uses the rolling sum of these errors, while AIC uses the count (n) and sum of those same errors. A smaller window makes both metrics more responsive to recent errors; a larger window smooths out short-term noise.

horizon

  • Default: 5

Number of future time periods to forecast. This value defines how many steps ahead the model predicts.

m

  • Default: If enableStl is TRUE, seasonality is estimated using ACF.

  • Validation: m >= 0 and m < (minTrainingSize / 2)

The seasonal period length. If enableStl is TRUE, the model uses an ACF (Autocorrelation Function) to estimate the seasonal period length. The ACF measures the correlation between a time series and its lagged versions at different time intervals. This helps to identify repeating patterns and seasonal cycles in the data by detecting how strongly current values relate to past values at various lags.

maxTrainingSize

  • Default: 512

  • Validation: maxTrainingSize must be greater than minTrainingSize and less than 10000

Maximum number of historical data points used for training. This value limits the training window to recent data.

minTrainingSize

  • Default: 128

  • Validation: minTrainingSize must be less than maxTrainingSize and greater than 1

Minimum number of data points required for training. The model won’t train if the quantity of available data is less than this value.

p

  • Default: NULL

  • Validation: If provided, must be >= 0

Order of the AR (Autoregressive) term in the ARIMA model, which defines the number of lagged values used in the model. This value represents the number of past values that directly influence the current prediction.

If p is not provided, the model uses auto-ARIMA to determine the best order.

q

  • Default: NULL

  • Validation: If provided, must be >= 0

Order of the MA (Moving Average) term in the ARIMA model, which defines the number of lagged error terms in the model. This value represents the number of past prediction errors that influence the current prediction.

If q is not provided, the model uses auto-ARIMA to determine the best order.

d

  • Default: NULL

  • Validation: If provided, must be >= 0

Order of differencing in the ARIMA model, which defines the number of times the data is differenced to achieve stationarity. A value of 0 means no differencing, 1 means first difference, and so on.

If d is not provided, the model uses auto-ARIMA to determine the best order.

updateInterval

  • Default: 5

Frequency of model updates. This value defines how often the model retrains with new data.

confidencePercentage
  • Default: 99.0

  • Validation: 0 < value ≤ 100

The desired confidence level for computing anomaly bounds.

Anomaly detection results

The following fields are returned by the ML_DETECT_ANOMALIES function.

actual_value

  • Type: DOUBLE

The original metric value.

  • Scalar mode: the original metric value.

  • Vector mode: NULL, because the input is multidimensional.

aic

  • Type: DOUBLE

The AIC (Akaike Information Criterion) at the time the forecast was generated, calculated over the last evalWindowSize errors.

AIC is a statistical measure used to evaluate and compare the quality of different models. It balances model accuracy against complexity by penalizing models with more parameters. Lower AIC values indicate better model performance, which helps to identify the optimal model configuration that provides the best fit to the data without overfitting.

forecast_value

  • Type: DOUBLE

The forecast value at the timestamp.

lower_bound

  • Type: DOUBLE

The lower confidence interval for the forecasted value.

rmse

  • Type: DOUBLE

The RMSE (Root Mean Square Error) at the time the forecast was generated, calculated over the last evalWindowSize errors.

RMSE measures the difference between the predicted and actual values in a time series by calculating the square root of the average of the squared differences between the predicted and actual values. A lower RMSE indicates better model performance, because it means the model’s predictions are closer to the actual values.

timestamp

  • Type: TIMESTAMP

The LocalDateTime for this forecast.

upper_bound

  • Type: DOUBLE

The upper confidence interval for the forecasted value.

is_anomaly
  • Type: BOOLEAN

TRUE if actual_value falls below lower_bound or above upper_bound; otherwise FALSE.

Example

  1. Create a test table that has a timestamp field for your time-series data.

    CREATE TABLE orders (
        total_orderunits double,
        summed_ts TIMESTAMP(3) NOT NULL,
        WATERMARK FOR summed_ts AS summed_ts
    );
    
  2. Insert mock time series data into the table.

    INSERT INTO orders (total_orderunits, summed_ts) VALUES
        (102.12, TIMESTAMP '2024-11-19 10:00:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:01:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:02:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:03:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:04:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:05:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:06:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:07:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:08:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:09:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:10:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:11:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:12:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:13:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:14:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:15:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:16:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:17:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:18:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:19:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:20:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:21:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:22:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:23:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:24:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:25:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:26:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:27:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:28:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:29:00.000');
    
  3. Run the following statement to perform anomaly detection on the mock data in the orders table.

    SELECT
        ML_DETECT_ANOMALIES(total_orderunits, summed_ts, JSON_OBJECT('p' VALUE 1, 'q' VALUE 1, 'd' VALUE 1, 'minTrainingSize' VALUE 10, 'enableStl' VALUE false))
        OVER (
            ORDER BY summed_ts
            RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS anomaly
    FROM orders;
    
  4. Run the following statement to perform anomaly detection with AutoArima on the mock data in the orders table.

    SELECT
        ML_DETECT_ANOMALIES(total_orderunits, summed_ts, JSON_OBJECT('minTrainingSize' VALUE 10, 'enableStl' VALUE false))
        OVER (
            ORDER BY summed_ts
            RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS anomaly
    FROM orders;
    
  5. If there are multiple values for the same timestamps, or the timestamps are not uniform, use tumble windows with ML_FORECAST.

    CREATE TABLE orderunits_result_anomaly DISTRIBUTED INTO 1 BUCKETS AS
    WITH windowed_avg AS (
      SELECT
        window_start,
        window_end,
        window_time,
        AVG(total_orderunits) AS avg_total_orderunits
      FROM
        TUMBLE(TABLE orders, DESCRIPTOR(summed_ts), INTERVAL '1' MINUTE)
      GROUP BY window_start, window_end, window_time
    )
    SELECT
      window_start,
      avg_total_orderunits,
      ML_DETECT_ANOMALIES(avg_total_orderunits, window_time, JSON_OBJECT('enableStl' VALUE false))
        OVER (
          ORDER BY window_time
          RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS anomaly
    FROM windowed_avg;
    

ML_DETECT_ANOMALIES_ROBUST

The ML_DETECT_ANOMALIES_ROBUST function detects univariate and multivariate anomalies in your data using Median Absolute Deviation (MAD).

Note

The ML_DETECT_ANOMALIES_ROBUST function is an Early Access Program feature in Confluent Cloud.

An Early Access feature is a component of Confluent Cloud introduced to gain feedback. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions.

Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. Early Access Program features are provided: (a) without support; (b) “AS IS”; and (c) without indemnification, warranty, or condition of any kind. No service level commitment will apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.

Median Absolute Deviation (MAD)

The ML_DETECT_ANOMALIES_ROBUST function is based on Median Absolute Deviation (MAD) and the geometric median. It has these advantages:

  • Robust to outliers and heavy‑tailed distributions

  • Simple to tune (no p, d, q orders or seasonality to pick)

  • Able to work with multivariate vectors (for example, CPU + memory + I/O) as well as single metrics

You use it the same way as other time-series ML functions in Flink SQL: as a windowed aggregate with the OVER (ORDER BY ts ...) clause.

The ML_DETECT_ANOMALIES_ROBUST function does the following over a sliding window of recent points:

  • Collects the last window observations (scalar or vectors).

  • Computes a robust center:

    • For a single metric: the median of the values.

    • For multiple metrics: the geometric L1 median (a robust multivariate center).

  • Computes the distance of each point in the window from the center.

  • Computes the Median Absolute Deviation (MAD) of these distances.

  • Converts MAD to a robust “standard deviation” estimate and applies a threshold multiplier: Threshold threshold * robust_sigma.

  • Flags the latest point as an anomaly if its distance from the center is greater than the threshold.

Because it uses medians and MAD instead of means and variance, the detector is much less sensitive to occasional spikes or corrupt data.

Your input data must include:

  • A timestamp column, for ordering the time series.

  • A single numeric column, for univariate detection, or

  • A vector of numeric columns, passed as a ROW(...) expression, for multivariate detection.

Examples

Univariate robust anomaly detection with default configuration

  1. Create a test table that has a timestamp field for your time-series data.

    CREATE TABLE orders (
        total_orderunits double,
        summed_ts TIMESTAMP(3) NOT NULL,
        WATERMARK FOR summed_ts AS summed_ts
    );
    
  2. Run MAD-based anomaly detection with default configuration. The default values are:

    • window = 20

    • threshold = 3.0

    • majorityRule = FALSE

    SELECT
      ML_DETECT_ANOMALIES_ROBUST(
        total_orderunits,
        summed_ts
      ) OVER (
        ORDER BY summed_ts
        RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
      ) AS anomaly
    FROM orders;
    

Multivariate anomaly detection with expert mode

  1. Create a service for service metrics:

    CREATE TABLE service_metrics (
        ts           TIMESTAMP(3) NOT NULL,
        cpu_percent  DOUBLE,
        heap_used_mb DOUBLE,
        gc_pause_ms  DOUBLE,
        WATERMARK FOR ts AS ts
    );
    
  2. Use multivariate robust detection with expert panel voting:

    SELECT
      ML_DETECT_ANOMALIES_ROBUST(
        ROW(cpu_percent, heap_used_mb, gc_pause_ms),
        ts,
        JSON_OBJECT(
          'window'            VALUE 60,
          'threshold'         VALUE 3.0,
          'majorityRule'      VALUE TRUE,
          'majorityThreshold' VALUE 0.6,
          -- Expert windows can be a string; they are parsed as an array.
          'expertWindows'     VALUE '[30,60,120]'
        )
      ) OVER (
        ORDER BY ts
        RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
      ) AS anomaly
    FROM service_metrics;
    
    • Each row’s metrics are treated as a 3-dimensional vector.

    • The detector uses a primary window of 60 points and a panel of windows [30,60,120].

    • A point is anomalous if more than 60% of ready experts vote it as an outlier.

Use tumble windows for irregular timestamps (multivariate)

If your raw stream is noisy or has many events per second, you can first aggregate into 1-minute windows and then run detection on the aggregated vectors. This approach gives you robust, drift-resistant multivariate anomaly detection on regularly spaced windows, with optional imputation to avoid repeated influence from outliers.

CREATE TABLE service_metrics_anomaly DISTRIBUTED INTO 1 BUCKETS AS
WITH windowed_metrics AS (
  SELECT
    window_start,
    window_time,
    AVG(cpu_percent)  AS avg_cpu,
    AVG(heap_used_mb) AS avg_heap,
    AVG(gc_pause_ms)  AS avg_gc
  FROM
    TUMBLE(TABLE service_metrics, DESCRIPTOR(ts), INTERVAL '1' MINUTE)
  GROUP BY window_start, window_time
)
SELECT
  window_start,
  ML_DETECT_ANOMALIES_ROBUST(
    ROW(avg_cpu, avg_heap, avg_gc),
    window_time,
    JSON_OBJECT(
      'window'        VALUE 30,
      'threshold'     VALUE 3.5,
      'imputeOutliers' VALUE TRUE
    )
  ) OVER (
    ORDER BY window_time
    RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  ) AS anomaly
FROM windowed_metrics;