Agent Runtime Guide with Confluent Cloud for Apache Flink

The agent runtime is the execution engine that orchestrates AI agents in streaming environments. This guide explains how the runtime works, how to configure it, and how to integrate it with your streaming applications.

Runtime architecture

The agent runtime is built on Flink and provides:

  • Iterative loop orchestration: Manages the reasoning and acting cycle
  • Tool integration: Seamlessly connects with MCP servers and Flink functions
  • Error handling: Provides robust error handling and retry mechanisms
  • Observability: Comprehensive logging and metrics

The runtime processes input events through several key components:

  • Iterative Loop: Orchestrates reasoning and tool execution cycles
  • Model Provider: Handles large language model (LLM) interactions and responses
  • Tool Manager: Manages function-based and model context protocol (MCP)- based tools

Core components

Tool Manager

Manages tool execution and integration:

  • Function-based tools (Flink UDFs)
  • MCP-based tools (external services)
  • Tool discovery and validation
  • Error handling and retries
Model Provider

Handles LLM interactions:

  • Model inference requests
  • Response processing
  • Token management
  • Cost tracking

Execution flow

The agent runtime follows this execution flow:

  1. Input processing

    • Receive streaming event
    • Validate input format
  2. State management

    • Create request state
    • Initialize connections
  3. Iterative loop execution

    • Analyze input with LLM
    • Decide on tool calls
    • Execute tools
    • Process results
    • Repeat until complete
  4. Context Management

    • Check context size
    • Trim or summarize conversation history if needed
    • Update local state
  5. Output Generation

    • Format final response
    • Emit output event
  6. Observability

    • Log execution details
    • Update metrics

Configuration

You can configure the agent runtime by using various options:

Basic Configuration
CREATE AGENT my_agent
USING MODEL my_model
USING TOOLS my_tool
USING PROMPT 'You are a helpful assistant'
WITH (
  'max_iterations' = '10',
  'request_timeout' = '60'
);
Advanced Configuration
CREATE AGENT advanced_agent
USING MODEL my_model
USING TOOLS my_tool1, my_tool2
USING PROMPT 'You are an advanced assistant'
WITH (
  'handle_exception' = 'continue',
  'max_consecutive_failures' = '3',
  'max_iterations' = '15',
  'max_tokens_threshold' = '100000'
  'request_timeout' = '300',
  'summarization_prompt' = 'concise',
  'tokens_management_strategy' = 'summarize'
);

Configuration Options

Option Description Default Required
handle_exception Exception handling strategy FAIL No
max_consecutive_failures Max number of consecutive failures allowed 1 No
max_iterations Maximum loop iterations 10 No
max_tokens_threshold token threshold to apply compaction 100000 No
request_timeout Maximum execution time for a request 600 No
summarization_prompt Prompt type to be used for summarization call CONCISE No
tokens_management_strategy Token management strategy for compaction NONE No

Valid values for the ENUM options are:

  • handle_exception: CONTINUE, FAIL; default is FAIL. For more information, see Error Handling.
  • summarization_prompt: CONCISE, STRUCTURED, VERBOSE; default is CONCISE. For more information, see Summarization prompt types.
  • tokens_management_strategy: NONE, SUMMARIZE, TRIM; default is NONE. For more information, see Token management strategies.

Tool integration

The runtime supports two types of tool integration:

Function-based tools

Function-based tools execute locally in the Flink runtime. To use a function-based tool, you must create a user-defined function (UDF) and upload it to Confluent Cloud. For more information, see Create a User-Defined Function.

-- Create a UDF
CREATE FUNCTION calculate_tax
USING JAR 'tax-calculator.jar'
COMMENT 'Calculate tax based on amount and location';

-- Create tool from UDF
CREATE TOOL tax_calculator
USING FUNCTION calculate_tax
WITH (
  'type' = 'function',
  'description' = 'Calculate tax for given amount and location'
);
Benefits
  • Low latency execution
  • No external dependencies
  • Full control over implementation
  • Automatic error handling

MCP-based tools

MCP tools connect to external services:

-- Create MCP connection
CREATE CONNECTION weather_api
WITH (
  'type' = 'mcp_server',
  'endpoint' = 'https://weather-api.example.com',
  'api-key' = '<your-api-key>'
);

-- Create MCP tool
CREATE TOOL weather_service
USING CONNECTION weather_api
WITH (
  'type' = 'mcp_server',
  'allowed_tools' = 'get_current_weather,get_forecast',
  'request_timeout' = '30'
);
Benefits
  • Access to external APIs
  • Standardized tool interface
  • Easy integration with existing services
  • Built-in retry and error handling

Configuration Options

Option Description Default Required
allowed_tools Allowed tools to be used by the agent Yes
max_retries Maximum number of retries for a tool call 2 No
request_timeout Maximum execution time for a request 20 seconds No

Error Handling

The runtime provides comprehensive error handling:

Retry Mechanisms
  • Automatic retry for transient failures: A retry mechanism is in place to automatically handle transient failures, such as temporary network interruptions or service unavailability, individually for each component.
  • Configurable retry counts and backoff: Retry attempts and backoff intervals can be independently configured for every tool, agent, and model to provide granular control over recovery behavior.
  • Exponential backoff for rate limits: The retry mechanism employs an exponential backoff strategy, progressively increasing wait times between attempts to effectively manage and respect API rate limits.
Error Classification
  • Transient errors: Includes temporary issues like network interruptions or request timeouts, which are generally resolved by retrying the operation.
  • Permanent errors: Encompasses non-retriable issues such as invalid input, malformed requests, or authentication failures that require intervention.
  • Tool-specific errors: Refers to errors generated directly by tool, model or remote UDF, which may have unique failure conditions specific to that component.
Fallback Strategies
  • Graceful degradation: Provides a configurable option to either gracefully degrade the job, allowing it to proceed with a FAILURE status for that particular event, or to terminate the job with user exception
  • Alternative tool selection: A feedback loop incorporates tool failure information into the context, enabling the model to dynamically select an alternative tool or correct the workflow path.
  • Default responses: Implements configurable guardrails that can trigger a default response or terminate the job with user exception after a specified threshold of repeated tool failures is met.
Error Recovery
  • Checkpointing to recover from failures: Periodically persists the application state, enabling the job to resume from the last successful checkpoint after an interruption.
  • Error logging and alerting: A comprehensive system captures detailed diagnostic information for all errors and triggers notifications.

Example error handling configuration:

CREATE AGENT robust_agent
USING MODEL my_model
USING PROMPT 'You are a robust assistant that handles errors gracefully'
WITH (
  'max_consecutive_failures' = '3',
  'handle_exception' = 'continue'
);

Context Management

The runtime provides flexible context management to operate within model token limits.

Token management strategies

The tokens_management_strategy option supports these values.

NONE

The runtime performs no token management. If the conversation history exceeds the model’s maximum token limit, the request fails.

TRIM

The runtime estimates the total tokens before sending a request. If the count exceeds the max_tokens_threshold, it sequentially removes the oldest messages from the conversation history, following the original user prompt.

This process continues until the total token count is below the max_tokens_threshold, attempting to maintain conversation semantics. If the threshold cannot be met, for example, if only the latest context remains, the history is sent as-is, which may lead to a model failure.

SUMMARIZE

The runtime estimates the total tokens. If the count exceeds the max_tokens_threshold, it generates a summary of the conversation history, excluding the original user message and the latest context. This summary replaces the summarized messages to reduce the token count below the threshold. If there are not enough messages to summarize, the history is sent as-is.

Configuration and recommendations

The max_tokens_threshold property is a user-defined limit that triggers the TRIM or SUMMARIZE strategy.

Recommendation: Set max_tokens_threshold to less than 80% of the model’s total token limit. This provides a buffer and prevents failures if the context can’t be reduced sufficiently.

Summarization prompt types

The summarization_prompt option supports these values.

CONCISE

Instructs the model to return a short, concise summary of the history, keeping only necessary details. This method saves the most tokens and is suitable for less complex agents.

STRUCTURED

Asks the model to summarize the conversation in a structured format, detailing tools called, remaining tasks, and other relevant items. This is effective for complex agents that don’t need minor details.

VERBOSE

Directs the model to create a structured and verbose summary, including all small details. This is useful for agents that must not lose context.

Observability

The runtime provides comprehensive observability:

Metrics
  • Agent execution times
  • Tool call latencies
  • Model inference costs
  • Error rates and types
Logging
  • Detailed execution logs
  • Tool call traces
  • Error stack traces
  • Performance metrics
Monitoring
  • Real-time dashboards
  • Alerting on failures
  • Performance tracking
  • Cost monitoring
Debugging
  • Request replay capabilities
  • Step-by-step execution traces
  • Tool call debugging
  • State inspection

Performance optimization

Tool Optimization
  • Use function-based tools for high-frequency operations
  • Implement caching for expensive operations
  • Optimize tool implementations
  • Use appropriate timeouts
Model Optimization
  • Cache model responses when appropriate
  • Use smaller models for simple tasks
  • Implement response streaming
  • Monitor token usage
State Optimization
  • Limit session size
  • Implement state compression
  • Use appropriate TTLs
  • Clean up old sessions
Resource Management
  • Configure appropriate parallelism
  • Monitor memory usage
  • Optimize checkpointing
  • Use appropriate backpressure

Example optimization configuration:

CREATE AGENT optimized_agent
USING MODEL my_model
USING PROMPT 'You are an optimized assistant'
WITH (
  'max_iterations' = '5',
  'tokens_management_strategy' = 'summarize',
  'max_tokens_threshold' = '80000',
  'summarization_prompt' = 'concise'
);

Best practices

Agent design

  • Keep agents focused on specific tasks
  • Use clear and specific prompts
  • Limit the number of tools per agent
  • Optimize configuration to add proper guardrails

Chatbot example

A chatbot for simple, quick interactions. It needs to answer fast, does not need to choose from multiple tool options, avoid long-running tasks, and shouldn’t get stuck in complex loops. If it fails, it should fail quickly and should not process further queries.

CREATE AGENT simple_faq_bot
USING MODEL fast_model
USING PROMPT 'You are a helpful FAQ assistant. Answer questions directly.'
USING TOOLS search
WITH (
  'handle_exception' = 'fail',            -- Stop immediately on request failure
  'max_consecutive_failures' = '1',       -- Don't retry if a tool fails.
  'max_iterations' = '3',                 -- Max 3 steps, for example, think, search, answer.
  'request_timeout' = '60',               -- Fail fast after 60 seconds.
  'tokens_management_strategy' = 'none'   -- No summarization for short chats.
);

Research analyst agent example

An agent that performs multi-step research, calls multiple tools, and builds a large context. It needs time to “think” and should be resilient to occasional tool failures.

CREATE AGENT research_analyst_agent
USING MODEL research_model
USING PROMPT 'You are a research analyst. Gather and synthesize information to answer complex questions.'
USING TOOLS research_tool, analyze_tool
WITH (
  'handle_exception' = 'continue',        -- If a search fails continue to another query
  'max_consecutive_failures' = '3',       -- Allow up to 3 tool failures in a row.
  'max_iterations' = '20',                -- Allow many steps for deep research.
  'max_tokens_threshold' = '128000',      -- Allow a very large context.
  'request_timeout' = '600',              -- Allow up to 10 minutes for complex reasoning.
  'summarization_prompt' = 'STRUCTURED',
  'tokens_management_strategy' = 'summarize'
);

Automated workflow example

An agent that runs a specific, automated workflow, for example, “Book a flight, then a hotel, then a car”. It needs to be resilient and handles expected failures without stopping the job.

CREATE AGENT travel_booker_agent
USING MODEL my_workflow_model
USING PROMPT 'You book travel. Follow the steps: 1. Find flight. 2. Find hotel. 3. Book car.'
USING TOOLS flight_tools, hotel_tools, gmail_tools
WITH (
  'handle_exception' = 'continue',          -- Critical for workflow to continue.
  'max_consecutive_failures' = '2',         -- Allow agent retries.
  'max_iterations' = '15',                  -- Enough steps to complete a complex booking.
  'max_tokens_threshold' = '32000',         -- A standard context size is fine.
  'request_timeout' = '300',                -- 5 minutes is a reasonable default.
  'tokens_management_strategy' = 'truncate' -- Just drop the oldest messages if context gets full.
);

Tool design

  • Make tools idempotent when possible
  • Provide clear error messages
  • Use appropriate timeouts
  • Handle edge cases gracefully

Internal auditor service example

If the server tools are usually very fast, for example, response time is less than 2 seconds, but can occasionally fail due to a brief network blip or a pod restart.

CREATE TOOL internal_auditor_service
USING CONNECTION internal_api
WITH (
  'type' = 'mcp_server',
  'allowed_tools' = 'check_user_permissions,log_event',
  'request_timeout' = '5', -- Fail fast after 5 seconds
  'max_retries' = 3        -- Retry up to 3 times
);

Complex report generator example

If MCP server tools perform complex calculations, for example, generating a detailed report or running a machine learning inference. The API is known to be slow, for example, it can take up to 90 seconds, but is generally reliable. It also might have strict rate limits, so you don’t want to hammer it with retries.

Note that slow MCP tools affect the overall latency of the agent.

CREATE TOOL complex_report_generator
USING CONNECTION third_party_analytics_api
WITH (
  'type' = 'mcp_server',
  'allowed_tools' = 'generate_quarterly_report',
  'request_timeout' = '120', -- Wait up to 2 minutes
  'max_retries' = 1          -- Only retry once if it fails
);
Performance
  • Monitor execution times
  • Use appropriate caching
  • Optimize tool implementations
  • Monitor resource usage
Monitoring
  • Set up comprehensive monitoring
  • Use appropriate alerting
  • Track key metrics
  • Regular performance reviews

Troubleshooting

Agent not responding
  • Check model configuration
  • Verify tool connections
  • Check for infinite loops
  • Review error logs
Tool calls failing
  • Verify tool definitions
  • Check connection configurations
  • Ensure proper error handling
  • Monitor tool execution
Performance issues
  • Optimize tool implementations
  • Use appropriate caching
  • Monitor resource usage
  • Consider parallelism

Debugging Steps

  1. Check agent execution logs
  2. Verify tool configurations
  3. Test tools independently
  4. Monitor resource usage
  5. Check for error patterns