Agent Runtime Guide with Confluent Cloud for Apache Flink¶
The agent runtime is the execution engine that orchestrates AI agents in streaming environments. This guide explains how the runtime works, how to configure it, and how to integrate it with your streaming applications.
Runtime architecture¶
The agent runtime is built on Flink and provides:
- Iterative loop orchestration: Manages the reasoning and acting cycle
- Tool integration: Seamlessly connects with MCP servers and Flink functions
- Error handling: Provides robust error handling and retry mechanisms
- Observability: Comprehensive logging and metrics
The runtime processes input events through several key components:
- Iterative Loop: Orchestrates reasoning and tool execution cycles
- Model Provider: Handles large language model (LLM) interactions and responses
- Tool Manager: Manages function-based and model context protocol (MCP)- based tools
Core components¶
- Tool Manager
Manages tool execution and integration:
- Function-based tools (Flink UDFs)
- MCP-based tools (external services)
- Tool discovery and validation
- Error handling and retries
- Model Provider
Handles LLM interactions:
- Model inference requests
- Response processing
- Token management
- Cost tracking
Execution flow¶
The agent runtime follows this execution flow:
Input processing
- Receive streaming event
- Validate input format
State management
- Create request state
- Initialize connections
Iterative loop execution
- Analyze input with LLM
- Decide on tool calls
- Execute tools
- Process results
- Repeat until complete
Context Management
- Check context size
- Trim or summarize conversation history if needed
- Update local state
Output Generation
- Format final response
- Emit output event
Observability
- Log execution details
- Update metrics
Configuration¶
You can configure the agent runtime by using various options:
- Basic Configuration
CREATE AGENT my_agent USING MODEL my_model USING TOOLS my_tool USING PROMPT 'You are a helpful assistant' WITH ( 'max_iterations' = '10', 'request_timeout' = '60' );
- Advanced Configuration
CREATE AGENT advanced_agent USING MODEL my_model USING TOOLS my_tool1, my_tool2 USING PROMPT 'You are an advanced assistant' WITH ( 'handle_exception' = 'continue', 'max_consecutive_failures' = '3', 'max_iterations' = '15', 'max_tokens_threshold' = '100000' 'request_timeout' = '300', 'summarization_prompt' = 'concise', 'tokens_management_strategy' = 'summarize' );
Configuration Options¶
| Option | Description | Default | Required |
|---|---|---|---|
handle_exception |
Exception handling strategy | FAIL | No |
max_consecutive_failures |
Max number of consecutive failures allowed | 1 | No |
max_iterations |
Maximum loop iterations | 10 | No |
max_tokens_threshold |
token threshold to apply compaction | 100000 | No |
request_timeout |
Maximum execution time for a request | 600 | No |
summarization_prompt |
Prompt type to be used for summarization call | CONCISE | No |
tokens_management_strategy |
Token management strategy for compaction | NONE | No |
Valid values for the ENUM options are:
handle_exception: CONTINUE, FAIL; default is FAIL. For more information, see Error Handling.summarization_prompt: CONCISE, STRUCTURED, VERBOSE; default is CONCISE. For more information, see Summarization prompt types.tokens_management_strategy: NONE, SUMMARIZE, TRIM; default is NONE. For more information, see Token management strategies.
Tool integration¶
The runtime supports two types of tool integration:
Function-based tools¶
Function-based tools execute locally in the Flink runtime. To use a function-based tool, you must create a user-defined function (UDF) and upload it to Confluent Cloud. For more information, see Create a User-Defined Function.
-- Create a UDF
CREATE FUNCTION calculate_tax
USING JAR 'tax-calculator.jar'
COMMENT 'Calculate tax based on amount and location';
-- Create tool from UDF
CREATE TOOL tax_calculator
USING FUNCTION calculate_tax
WITH (
'type' = 'function',
'description' = 'Calculate tax for given amount and location'
);
- Benefits
- Low latency execution
- No external dependencies
- Full control over implementation
- Automatic error handling
MCP-based tools¶
MCP tools connect to external services:
-- Create MCP connection
CREATE CONNECTION weather_api
WITH (
'type' = 'mcp_server',
'endpoint' = 'https://weather-api.example.com',
'api-key' = '<your-api-key>'
);
-- Create MCP tool
CREATE TOOL weather_service
USING CONNECTION weather_api
WITH (
'type' = 'mcp_server',
'allowed_tools' = 'get_current_weather,get_forecast',
'request_timeout' = '30'
);
- Benefits
- Access to external APIs
- Standardized tool interface
- Easy integration with existing services
- Built-in retry and error handling
Configuration Options¶
| Option | Description | Default | Required |
|---|---|---|---|
allowed_tools |
Allowed tools to be used by the agent | – | Yes |
max_retries |
Maximum number of retries for a tool call | 2 | No |
request_timeout |
Maximum execution time for a request | 20 seconds | No |
Error Handling¶
The runtime provides comprehensive error handling:
- Retry Mechanisms
- Automatic retry for transient failures: A retry mechanism is in place to automatically handle transient failures, such as temporary network interruptions or service unavailability, individually for each component.
- Configurable retry counts and backoff: Retry attempts and backoff intervals can be independently configured for every tool, agent, and model to provide granular control over recovery behavior.
- Exponential backoff for rate limits: The retry mechanism employs an exponential backoff strategy, progressively increasing wait times between attempts to effectively manage and respect API rate limits.
- Error Classification
- Transient errors: Includes temporary issues like network interruptions or request timeouts, which are generally resolved by retrying the operation.
- Permanent errors: Encompasses non-retriable issues such as invalid input, malformed requests, or authentication failures that require intervention.
- Tool-specific errors: Refers to errors generated directly by tool, model or remote UDF, which may have unique failure conditions specific to that component.
- Fallback Strategies
- Graceful degradation: Provides a configurable option to either gracefully degrade the job, allowing it to proceed with a FAILURE status for that particular event, or to terminate the job with user exception
- Alternative tool selection: A feedback loop incorporates tool failure information into the context, enabling the model to dynamically select an alternative tool or correct the workflow path.
- Default responses: Implements configurable guardrails that can trigger a default response or terminate the job with user exception after a specified threshold of repeated tool failures is met.
- Error Recovery
- Checkpointing to recover from failures: Periodically persists the application state, enabling the job to resume from the last successful checkpoint after an interruption.
- Error logging and alerting: A comprehensive system captures detailed diagnostic information for all errors and triggers notifications.
Example error handling configuration:
CREATE AGENT robust_agent
USING MODEL my_model
USING PROMPT 'You are a robust assistant that handles errors gracefully'
WITH (
'max_consecutive_failures' = '3',
'handle_exception' = 'continue'
);
Context Management¶
The runtime provides flexible context management to operate within model token limits.
Token management strategies¶
The tokens_management_strategy option supports these values.
NONE¶
The runtime performs no token management. If the conversation history exceeds the model’s maximum token limit, the request fails.
TRIM¶
The runtime estimates the total tokens before sending a request. If the
count exceeds the max_tokens_threshold, it sequentially removes the oldest
messages from the conversation history, following the original user prompt.
This process continues until the total token count is below the
max_tokens_threshold, attempting to maintain conversation semantics. If
the threshold cannot be met, for example, if only the latest context remains,
the history is sent as-is, which may lead to a model failure.
SUMMARIZE¶
The runtime estimates the total tokens. If the count exceeds the
max_tokens_threshold, it generates a summary of the conversation history,
excluding the original user message and the latest context. This summary
replaces the summarized messages to reduce the token count below the threshold.
If there are not enough messages to summarize, the history is sent as-is.
Configuration and recommendations¶
The max_tokens_threshold property is a user-defined limit that triggers
the TRIM or SUMMARIZE strategy.
Recommendation: Set max_tokens_threshold to less than 80% of the model’s
total token limit. This provides a buffer and prevents failures if the context
can’t be reduced sufficiently.
Summarization prompt types¶
The summarization_prompt option supports these values.
CONCISE¶
Instructs the model to return a short, concise summary of the history, keeping only necessary details. This method saves the most tokens and is suitable for less complex agents.
STRUCTURED¶
Asks the model to summarize the conversation in a structured format, detailing tools called, remaining tasks, and other relevant items. This is effective for complex agents that don’t need minor details.
VERBOSE¶
Directs the model to create a structured and verbose summary, including all small details. This is useful for agents that must not lose context.
Observability¶
The runtime provides comprehensive observability:
- Metrics
- Agent execution times
- Tool call latencies
- Model inference costs
- Error rates and types
- Logging
- Detailed execution logs
- Tool call traces
- Error stack traces
- Performance metrics
- Monitoring
- Real-time dashboards
- Alerting on failures
- Performance tracking
- Cost monitoring
- Debugging
- Request replay capabilities
- Step-by-step execution traces
- Tool call debugging
- State inspection
Performance optimization¶
- Tool Optimization
- Use function-based tools for high-frequency operations
- Implement caching for expensive operations
- Optimize tool implementations
- Use appropriate timeouts
- Model Optimization
- Cache model responses when appropriate
- Use smaller models for simple tasks
- Implement response streaming
- Monitor token usage
- State Optimization
- Limit session size
- Implement state compression
- Use appropriate TTLs
- Clean up old sessions
- Resource Management
- Configure appropriate parallelism
- Monitor memory usage
- Optimize checkpointing
- Use appropriate backpressure
Example optimization configuration:
CREATE AGENT optimized_agent
USING MODEL my_model
USING PROMPT 'You are an optimized assistant'
WITH (
'max_iterations' = '5',
'tokens_management_strategy' = 'summarize',
'max_tokens_threshold' = '80000',
'summarization_prompt' = 'concise'
);
Best practices¶
Agent design¶
- Keep agents focused on specific tasks
- Use clear and specific prompts
- Limit the number of tools per agent
- Optimize configuration to add proper guardrails
Chatbot example¶
A chatbot for simple, quick interactions. It needs to answer fast, does not need to choose from multiple tool options, avoid long-running tasks, and shouldn’t get stuck in complex loops. If it fails, it should fail quickly and should not process further queries.
CREATE AGENT simple_faq_bot
USING MODEL fast_model
USING PROMPT 'You are a helpful FAQ assistant. Answer questions directly.'
USING TOOLS search
WITH (
'handle_exception' = 'fail', -- Stop immediately on request failure
'max_consecutive_failures' = '1', -- Don't retry if a tool fails.
'max_iterations' = '3', -- Max 3 steps, for example, think, search, answer.
'request_timeout' = '60', -- Fail fast after 60 seconds.
'tokens_management_strategy' = 'none' -- No summarization for short chats.
);
Research analyst agent example¶
An agent that performs multi-step research, calls multiple tools, and builds a large context. It needs time to “think” and should be resilient to occasional tool failures.
CREATE AGENT research_analyst_agent
USING MODEL research_model
USING PROMPT 'You are a research analyst. Gather and synthesize information to answer complex questions.'
USING TOOLS research_tool, analyze_tool
WITH (
'handle_exception' = 'continue', -- If a search fails continue to another query
'max_consecutive_failures' = '3', -- Allow up to 3 tool failures in a row.
'max_iterations' = '20', -- Allow many steps for deep research.
'max_tokens_threshold' = '128000', -- Allow a very large context.
'request_timeout' = '600', -- Allow up to 10 minutes for complex reasoning.
'summarization_prompt' = 'STRUCTURED',
'tokens_management_strategy' = 'summarize'
);
Automated workflow example¶
An agent that runs a specific, automated workflow, for example, “Book a flight, then a hotel, then a car”. It needs to be resilient and handles expected failures without stopping the job.
CREATE AGENT travel_booker_agent
USING MODEL my_workflow_model
USING PROMPT 'You book travel. Follow the steps: 1. Find flight. 2. Find hotel. 3. Book car.'
USING TOOLS flight_tools, hotel_tools, gmail_tools
WITH (
'handle_exception' = 'continue', -- Critical for workflow to continue.
'max_consecutive_failures' = '2', -- Allow agent retries.
'max_iterations' = '15', -- Enough steps to complete a complex booking.
'max_tokens_threshold' = '32000', -- A standard context size is fine.
'request_timeout' = '300', -- 5 minutes is a reasonable default.
'tokens_management_strategy' = 'truncate' -- Just drop the oldest messages if context gets full.
);
Tool design¶
- Make tools idempotent when possible
- Provide clear error messages
- Use appropriate timeouts
- Handle edge cases gracefully
Internal auditor service example¶
If the server tools are usually very fast, for example, response time is less than 2 seconds, but can occasionally fail due to a brief network blip or a pod restart.
CREATE TOOL internal_auditor_service
USING CONNECTION internal_api
WITH (
'type' = 'mcp_server',
'allowed_tools' = 'check_user_permissions,log_event',
'request_timeout' = '5', -- Fail fast after 5 seconds
'max_retries' = 3 -- Retry up to 3 times
);
Complex report generator example¶
If MCP server tools perform complex calculations, for example, generating a detailed report or running a machine learning inference. The API is known to be slow, for example, it can take up to 90 seconds, but is generally reliable. It also might have strict rate limits, so you don’t want to hammer it with retries.
Note that slow MCP tools affect the overall latency of the agent.
CREATE TOOL complex_report_generator
USING CONNECTION third_party_analytics_api
WITH (
'type' = 'mcp_server',
'allowed_tools' = 'generate_quarterly_report',
'request_timeout' = '120', -- Wait up to 2 minutes
'max_retries' = 1 -- Only retry once if it fails
);
- Performance
- Monitor execution times
- Use appropriate caching
- Optimize tool implementations
- Monitor resource usage
- Monitoring
- Set up comprehensive monitoring
- Use appropriate alerting
- Track key metrics
- Regular performance reviews
Troubleshooting¶
- Agent not responding
- Check model configuration
- Verify tool connections
- Check for infinite loops
- Review error logs
- Tool calls failing
- Verify tool definitions
- Check connection configurations
- Ensure proper error handling
- Monitor tool execution
- Performance issues
- Optimize tool implementations
- Use appropriate caching
- Monitor resource usage
- Consider parallelism
Debugging Steps
- Check agent execution logs
- Verify tool configurations
- Test tools independently
- Monitor resource usage
- Check for error patterns