Detect Anomalies in Data with Confluent Cloud for Apache Flink

Anomaly detection uses statistical and machine learning techniques to identify data points, events, or observations that deviate significantly from the expected pattern within a stream. These deviations, or anomalies, can indicate critical incidents, like system failures, fraudulent activities, or unusual behavioral patterns that require further investigation.

Built on forecasting, anomaly detection identifies unexpected deviations in real-time, helping improve data quality, detect operational issues instantly, and enable faster decision-making. Instead of waiting for batch processing, Confluent’s continuous anomaly detection monitors data as it arrives, detecting outliers the moment they occur.

Confluent Cloud for Apache Flink® integrates anomaly detection capabilities on your streams by using the ML_DETECT_ANOMALIES function. This function leverages a popular machine learning algorithm, named ARIMA (Autoregressive Integrated Moving Average), that is optimized for real-time performance, enabling accurate and reliable anomaly detection.

Your data must include:

  • A timestamp column.
  • A target column representing some quantity of interest at each timestamp.

ARIMA model

ARIMA, which stands for Autoregressive Integrated Moving Average, is a powerful statistical technique used for time series analysis and forecasting. It combines three key components to model and predict future values based on past data:

  • Autoregressive (AR): This component uses past values of the time series to predict future values. It assumes that current observations are influenced by previous observations.
  • Integrated (I): This refers to the differencing of raw observations to make the time series stationary. Stationary data has consistent statistical properties over time, which is crucial for accurate forecasting.
  • Moving Average (MA): This component incorporates the relationship between an observation and a residual error from a moving average model applied to previous observations.

ARIMA models are used widely in various fields, including finance, economics, and weather forecasting, to analyze time-dependent data and make predictions. They are particularly effective when dealing with non-seasonal time series data that exhibits trends or patterns over time.

The model is typically represented as ARIMA(p,q,d), where:

  • p: The number of autoregressive terms
  • q: The order of the moving average
  • d: The degree of differencing

By adjusting these parameters, analysts can fine-tune the model to best fit their specific time series data and improve forecasting accuracy.

Be aware that forecasting accuracy can vary greatly, based on many parameters, and there is no guarantee of correctness in predictions made by using ARIMA and machine learning.

Anomaly detection parameters

The following parameters are supported for anomaly detection.

Model configuration

Use the following parameters to configure the ML_DETECT_ANOMALIES function.

enableStl

  • Default: FALSE

Enable STL (Seasonal-Trend decomposition using Loess smoothing) decomposition. If set to TRUE, the time series is decomposed into trend, seasonal, and remainder components.

STL is a statistical method that separates a time series into three additive components: trend, seasonal, and remainder.

  • The trend component captures the long-term direction or movement in the data.
  • The seasonal component captures regular, predictable patterns that repeat over fixed periods.
  • The remainder component captures random fluctuations that aren’t explained by trend or seasonality.

This decomposition helps improve forecasting accuracy by enabling the ARIMA model to better analyze and predict the underlying patterns in your data.

evalWindowSize

  • Default: 5
  • Validation: Must be ≥ 1

Keeps the last evalWindowSize squared errors for both RMSE and AIC calculations. RMSE uses the rolling sum of these errors, while AIC uses the count (n) and sum of those same errors. A smaller window makes both metrics more responsive to recent errors; a larger window smooths out short-term noise.

horizon

  • Default: 1

Number of future time periods to forecast. This value defines how many steps ahead the model predicts.

m

  • Default: If enableStl is TRUE, seasonality is estimated using ACF.
  • Validation: m >= 0 and m < (minTrainingSize / 2)

The seasonal period length. If enableStl is TRUE, the model uses an ACF (Autocorrelation Function) to estimate the seasonal period length. The ACF measures the correlation between a time series and its lagged versions at different time intervals. This helps to identify repeating patterns and seasonal cycles in the data by detecting how strongly current values relate to past values at various lags.

maxTrainingSize

  • Default: 512
  • Validation: maxTrainingSize must be greater than minTrainingSize and less than 10000

Maximum number of historical data points used for training. This value limits the training window to recent data.

minTrainingSize

  • Default: 128
  • Validation: minTrainingSize must be less than maxTrainingSize and greater than 1

Minimum number of data points required for training. The model won’t train if the quantity of available data is less than this value.

p

  • Default: NULL
  • Validation: If provided, must be >= 0

Order of the AR (Autoregressive) term in the ARIMA model, which defines the number of lagged values used in the model. This value represents the number of past values that directly influence the current prediction.

If p is not provided, the model uses auto-ARIMA to determine the best order.

q

  • Default: NULL
  • Validation: If provided, must be >= 0

Order of the MA (Moving Average) term in the ARIMA model, which defines the number of lagged error terms in the model. This value represents the number of past prediction errors that influence the current prediction.

If q is not provided, the model uses auto-ARIMA to determine the best order.

d

  • Default: NULL
  • Validation: If provided, must be >= 0

Order of differencing in the ARIMA model, which defines the number of times the data is differenced to achieve stationarity. A value of 0 means no differencing, 1 means first difference, and so on.

If d is not provided, the model uses auto-ARIMA to determine the best order.

updateInterval

  • Default: 5

Frequency of model updates. This value defines how often the model retrains with new data.

confidencePercentage

  • Default: 99.0
  • Validation: 0 < value ≤ 100

The desired confidence level for computing anomaly bounds.

Anomaly detection results

The following fields are returned by the ML_DETECT_ANOMALIES function.

aic

  • Type: DOUBLE

The AIC (Akaike Information Criterion) at the time the forecast was generated, calculated over the last evalWindowSize errors.

AIC is a statistical measure used to evaluate and compare the quality of different models. It balances model accuracy against complexity by penalizing models with more parameters. Lower AIC values indicate better model performance, which helps to identify the optimal model configuration that provides the best fit to the data without overfitting.

forecast_value

  • Type: DOUBLE

The forecast value at the timestamp.

lower_bound

  • Type: DOUBLE

The lower confidence interval for the forecasted value.

rmse

  • Type: DOUBLE

The RMSE (Root Mean Square Error) at the time the forecast was generated, calculated over the last evalWindowSize errors.

RMSE measures the difference between the predicted and actual values in a time series by calculating the square root of the average of the squared differences between the predicted and actual values. A lower RMSE indicates better model performance, because it means the model’s predictions are closer to the actual values.

timestamp

  • Type: TIMESTAMP

The LocalDateTime for this forecast.

upper_bound

  • Type: DOUBLE

The upper confidence interval for the forecasted value.

is_anomaly

  • Type: BOOLEAN

TRUE if actual_value falls below lower_bound or above upper_bound; otherwise FALSE.

Pricing

Anomaly detection is billed in CFUs, as part of your compute pool usage.

Example

  1. Create a test table that has a timestamp field for your time series data.

    CREATE TABLE orders (
        total_orderunits double,
        summed_ts TIMESTAMP(3) NOT NULL,
        WATERMARK FOR summed_ts AS summed_ts
    );
    
  2. Insert mock time series data into the table.

    INSERT INTO orders (total_orderunits, summed_ts) VALUES
        (102.12, TIMESTAMP '2024-11-19 10:00:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:01:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:02:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:03:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:04:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:05:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:06:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:07:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:08:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:09:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:10:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:11:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:12:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:13:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:14:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:15:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:16:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:17:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:18:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:19:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:20:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:21:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:22:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:23:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:24:00.000'),
        (102.12, TIMESTAMP '2024-11-19 10:25:00.000'),
        (103.45, TIMESTAMP '2024-11-19 10:26:00.000'),
        (101.89, TIMESTAMP '2024-11-19 10:27:00.000'),
        (104.23, TIMESTAMP '2024-11-19 10:28:00.000'),
        (102.78, TIMESTAMP '2024-11-19 10:29:00.000');
    
  3. Run the following statement to perform anomaly detection on the mock data in the orders table.

    SELECT
        ML_DETECT_ANOMALIES(total_orderunits, summed_ts, JSON_OBJECT('p' VALUE 1, 'q' VALUE 1, 'd' VALUE 1, 'minTrainingSize' VALUE 10, 'enableStl' VALUE false))
        OVER (
            ORDER BY summed_ts
            RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS anomaly
    FROM orders;
    
  4. Run the following statement to perform anomaly detection with AutoArima on the mock data in the orders table.

    SELECT
        ML_DETECT_ANOMALIES(total_orderunits, summed_ts, JSON_OBJECT('minTrainingSize' VALUE 10, 'enableStl' VALUE false))
        OVER (
            ORDER BY summed_ts
            RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS anomaly
    FROM orders;
    
  5. If there are multiple values for the same timestamps, or the timestamps are not uniform, use tumble windows with ML_FORECAST.

    CREATE TABLE orderunits_result_anomaly DISTRIBUTED INTO 1 BUCKETS AS
    WITH windowed_avg AS (
      SELECT
        window_start,
        window_end,
        window_time,
        AVG(total_orderunits) AS avg_total_orderunits
      FROM TABLE(
        TUMBLE(TABLE orders, DESCRIPTOR(summed_ts), INTERVAL '1' MINUTE)
      )
      GROUP BY window_start, window_end, window_time
    )
    SELECT
      window_start,
      avg_total_orderunits,
      ML_DETECT_ANOMALIES(avg_total_orderunits, window_time, JSON_OBJECT('enableStl' VALUE false))
        OVER (
          ORDER BY window_time
          RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS anomaly
    FROM windowed_avg;