Create and Run Streaming Agents with Confluent Cloud

Confluent Intelligence provides a high-level way to build AI agents that can reason over streaming data and take actions through tool invocation. This guide explains how to use the declarative agent syntax to create powerful, intelligent streaming workflows.

Confluent Intelligence enables agents to handle complex, multi-step tasks that require both reasoning and external tool usage.

Agent definition

A declarative specification of an AI agent that includes:

  • Model reference for reasoning
  • System prompt defining the agent’s role
  • Tools the agent can use
  • Configuration options
Tool resources

Encapsulated capabilities that agents can invoke:

  • Function-based tools (Flink UDFs)
  • MCP-based tools (external services)
  • Custom tool implementations

Create an agent

The following example shows general steps for creating a simple customer support agent that can answer questions and look up account information. All steps are shown in Flink SQL syntax.

Step 1: Define tools

Create the tools your agent will use. A tool is a first-class resource in Confluent Cloud for Apache Flink®, and you create it by using the CREATE TOOL statement. This example creates a function-based tool for account lookup and an MCP-based tool for external services.

-- Create a function for account lookup.
CREATE FUNCTION lookup_account
USING JAR 'account-lookup.jar'
COMMENT 'Function to lookup customer account information';

-- Create a tool based on the function.
CREATE TOOL account_lookup_tool
USING FUNCTION lookup_account
WITH (
  'type' = 'function',
  'description' = 'Lookup customer account information by ID'
);

-- Create a connection to an external MCP server.
CREATE CONNECTION support_mcp
WITH (
  'type' = 'mcp_server',
  'endpoint' = 'https://support-api.example.com',
  'api-key' = '<your-api-key>'
);

-- Create an MCP tool based on the connection.
CREATE TOOL support_api_tool
USING CONNECTION support_mcp
WITH (
  'type' = 'mcp_server',
  'allowed_tools' = 'create_ticket,update_ticket,search_kb',
  'request_timeout' = '30'
);

Step 2: Create the agent

Define the agent with its model, prompt, and tools. This example creates a model that uses the OpenAI API and an agent that uses the model and the tools you created in the previous step.

-- Create a model that uses the OpenAI API.
CREATE MODEL support_llm
INPUT (message STRING)
OUTPUT (response STRING)
WITH (
  'provider' = 'openai',
  'model' = 'gpt-4o',
  'openai.connection' = 'your-openai-connection'
);

-- Create an agent that uses the model and tools.
CREATE AGENT customer_support_agent
USING MODEL support_llm
USING PROMPT 'You are a helpful customer support agent. Use the available tools to look up account information and create support tickets when needed.'
USING TOOLS account_lookup_tool, support_api_tool
WITH (
  'max_iterations' = '5'
);

Step 3: Execute the agent

Run the agent on streaming data. This example creates an input table for customer queries and an output table for agent responses.

 -- Create an input table for customer queries.
 CREATE TABLE customer_queries (
   query_id STRING,
   customer_id STRING,
   message STRING,
   timestamp TIMESTAMP(3)
 );

 -- Execute the agent on streaming data.
SELECT query_id, customer_id, message, agent_output FROM customer_queries,
 LATERAL TABLE(AI_RUN_AGENT(`customer_support_agent`, `message`, `query_id`));

Configure a Streaming Agent

Streaming Agents support various configuration options to customize their behavior.

Options
  • max_iterations: Maximum loop iterations (default: 10)
  • max_consecutive_failures: Maximum consecutive failures (optional, default: 3)

This example shows how to use advanced options.

CREATE AGENT advanced_agent
USING MODEL my_model
USING PROMPT 'You are an advanced AI agent...'
USING TOOLS tool1, tool2
WITH (
  'max_iterations' = '8',
  'max_consecutive_failures' = '5'
);

Tool types

Streaming Agents support function-based and MCP-based tools.

Function-based tools

A function-based tool wraps a Flink SQL user-defined function (UDF) and executes locally in the Flink runtime.

Function-based tools have these benefits:

  • Low-latency execution
  • No external dependencies
  • Full control over implementation

The following example shows how to create a function-based tool based on a UDF.

-- Create a UDF.
CREATE FUNCTION calculate_discount
USING JAR 'pricing.jar'
COMMENT 'Calculate discount based on customer tier';

-- Create tool based on the UDF.
CREATE TOOL discount_calculator
USING FUNCTION calculate_discount
WITH (
  'type' = 'function',
  'description' = 'Calculate customer discount percentage'
);

MCP-based tools

An MCP-based tool connects to external services.

MCP-based tools have these benefits:

  • Access to external APIs
  • Standardized tool interface
  • Easy integration with existing services

The following example shows how to create an MCP tool based on a connection resource.

-- Create a connection to an external MCP server.
CREATE CONNECTION external_api
WITH (
  'type' = 'mcp_server',
  'endpoint' = 'https://api.example.com',
  'api-key' = '<your-api-key>'
);

-- Create an MCP tool based on the connection.
CREATE TOOL external_service
USING CONNECTION external_api
WITH (
  'type' = 'mcp_server',
  'allowed_tools' = 'get_weather,get_stock_price',
  'request_timeout' = '30'
);

Best practices

Follow these best practices for designing and implementing agents and tools.

Agent design
  • Keep prompts clear and specific.
  • Limit the number of tools per agent.
  • Use descriptive tool names and descriptions.
  • Set appropriate max_iterations based on task complexity.
Tool design
  • Make tools idempotent when possible.
  • Provide clear error messages.
  • Use appropriate timeouts.
  • Handle edge cases gracefully.
Performance
  • Use function-based tools for high-frequency operations.
  • Cache frequently accessed data.
  • Monitor agent execution times.
  • Use appropriate partitioning for large datasets.
Security
  • Store API keys in secure connections.
  • Validate tool inputs.
  • Implement proper access controls.
  • Monitor tool usage and costs.

Common patterns

The following examples show common patterns for building agents.

Multi-agent workflows

For complex tasks, chain multiple agents together. The following example chains two agents together. The first agent classifies the request, and the second agent routes the request to the appropriate specialist.

-- First agent: Classify the request
CREATE AGENT classifier
USING MODEL my_model
USING PROMPT 'Classify customer requests into categories'
WITH ('max_iterations' = '3');

-- Second agent: Route the request to the appropriate specialist.
CREATE AGENT router
USING MODEL my_model
USING PROMPT 'Route classified requests to appropriate specialists'
USING TOOLS routing_tool
WITH ('max_iterations' = '5');

Error Handling

Implement robust error handling in your agents. The following example creates an agent that handles errors gracefully.

CREATE AGENT robust_agent
USING MODEL my_model
USING PROMPT 'You are a robust agent that handles errors gracefully. If a tool fails, try alternative approaches or ask for clarification.'
USING TOOLS primary_tool, fallback_tool
WITH (
  'max_iterations' = '10',
  'max_consecutive_failures' = '5'
);

Troubleshooting

The following are common issues and possible solutions.

Agent not responding
  • Check model configuration and API keys.
  • Verify tool connections are working.
  • Increase max_iterations if needed.
  • Check for infinite loops in logic.
Tool calls failing
  • Verify tool definitions are correct
  • Check connection configurations.
  • Ensure proper error handling in tools.
  • Monitor tool execution logs.
Performance issues
  • Use function-based tools for frequent operations
  • Optimize tool implementations.
  • Consider caching strategies.
  • Monitor resource usage.

Debugging tips

  • Use replay functionality to debug agent behavior.
  • Check agent execution logs for detailed information.
  • Test tools independently before using in agents.
  • Use smaller max_iterations during development.