Streaming Agents Examples with Confluent Cloud

This guide provides examples for building Streaming Agents applications by using Flink SQL statements. Each example demonstrates different aspects of the Streaming Agents platform, from basic agent creation to complex multi-agent workflows.

Quick start repo

Streaming Agents quickstart architecture

Streaming Agents quickstart architecture

Use the Streaming Agents Quickstart repo to build your first Streaming Agent in minutes.

Customer support agent

Build a customer support agent that can answer questions and create support tickets.

Prerequisites

  • Confluent Cloud account
  • Flink compute pool
  • OpenAI API key

Step 1: Set up the environment

In this step, you create connections to the OpenAI API and the MCP endpoint.

-- Create connection to OpenAI
CREATE CONNECTION openai_connection
WITH (
  'type' = 'openai',
  'endpoint' = 'https://api.openai.com/v1/chat/completions',
  'api-key' = 'your-openai-key'
);

-- Create connection to support API
CREATE CONNECTION support_api
WITH (
  'type' = 'mcp_server',
  'endpoint' = 'https://support-api.example.com',
  'api-key' = '<your-support-api-key>'
);

Step 2: Create Tools

In this step, you create tools for the agent. This example creates a function-based tool for account lookup and an MCP-based tool for support API.

-- Create function for account lookup
CREATE FUNCTION lookup_customer
USING JAR 'customer-lookup.jar'
COMMENT 'Lookup customer information by ID';

-- Create function tool
CREATE TOOL customer_lookup
USING FUNCTION lookup_customer
WITH (
  'type' = 'function',
  'description' = 'Lookup customer account information'
);

-- Create MCP tool for support API
CREATE TOOL support_tools
USING CONNECTION support_api
WITH (
  'type' = 'mcp_server',
  'allowed_tools' = 'create_ticket,update_ticket,search_kb',
  'request_timeout' = '30'
);

Step 3: Create the model

In this step, you create a model that uses the OpenAI API and an agent that uses the model and the tools you created in the previous step.

CREATE MODEL support_llm
INPUT (message STRING)
OUTPUT (response STRING)
WITH (
  'provider' = 'openai',
  'connection' = 'openai_connection',
  'openai.model' = 'gpt-4o',
  'openai.temperature' = '0.7'
);

Step 4: Create the agent

In this step, you create an agent that uses the model and the tools you created in the previous step.

CREATE AGENT customer_support_agent
USING MODEL support_llm
USING PROMPT 'You are a helpful customer support agent. Use the available tools to look up customer information and create support tickets when needed. Always be polite and professional.'
USING TOOLS customer_lookup, support_tools
WITH (
  'max_iterations' = '5'
);

Step 5: Create input and output tables

In this step, you create input and output tables for the agent.

-- Input table for customer messages
CREATE TABLE customer_messages (
  message_id STRING,
  customer_id STRING,
  message STRING,
  timestamp TIMESTAMP(3),
  WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND
) WITH (
  'topic' = 'customer-messages',
  'format' = 'json',
  'scan.startup.mode' = 'earliest'
);

-- Output table for agent responses
CREATE TABLE support_responses (
  message_id STRING,
  customer_id STRING,
  original_message STRING,
  agent_response STRING,
  ticket_id STRING,
  timestamp TIMESTAMP(3)
) WITH (
  'topic' = 'support-responses',
  'format' = 'json'
);

Step 6: Run the agent

In this step, you run the agent on the input table and insert the results into the output table.

INSERT INTO support_responses
SELECT
  t.message_id,
  t.customer_id,
  t.message AS original_message,
  a.response AS agent_response,
  JSON_EXTRACT(a.response, '$.ticket_id') AS ticket_id,
  t.timestamp
FROM
  customer_messages AS t,
  LATERAL TABLE(
    AI_RUN_AGENT('customer_support_agent', t.message, t.message_id)
  ) AS a(status, response);

Financial fraud detection

Build a multi-agent system for real-time fraud detection.

Step 1: Create specialized agents

In this step, you create specialized agents for the fraud detection system.

-- Risk assessment agent
CREATE AGENT risk_assessor
USING MODEL fraud_llm
USING PROMPT 'You are a financial risk assessment expert. Analyze transactions for potential fraud indicators.'
USING TOOLS transaction_analyzer, risk_calculator
WITH (
  'max_iterations' = '3'
);

-- Investigation agent
CREATE AGENT fraud_investigator
USING MODEL fraud_llm
USING PROMPT 'You are a fraud investigation specialist. Investigate suspicious transactions and gather evidence.'
USING TOOLS customer_history, external_verification, alert_system
WITH (
  'max_iterations' = '8'
);

-- Decision agent
CREATE AGENT fraud_decision
USING MODEL fraud_llm
USING PROMPT 'You are a fraud decision maker. Based on investigation results, decide whether to block, flag, or approve transactions.'
USING TOOLS transaction_blocker, notification_system
WITH (
  'max_iterations' = '5'
);

Step 2: Create the workflow

In this step, you create a workflow that uses the agents you created in the previous step.

-- First stage: Risk assessment
CREATE VIEW risk_assessed_transactions AS
SELECT
  t.transaction_id,
  t.customer_id,
  t.amount,
  t.risk_score,
  a.response AS risk_analysis
FROM
  transactions AS t,
  LATERAL TABLE(
    AI_RUN_AGENT('risk_assessor', t.transaction_data, t.transaction_id)
  ) AS a(status, response);

-- Second stage: Investigation for high-risk transactions
CREATE VIEW investigated_transactions AS
SELECT
  t.transaction_id,
  t.customer_id,
  t.amount,
  t.risk_score,
  t.risk_analysis,
  a.response AS investigation_results
FROM
  risk_assessed_transactions AS t,
  LATERAL TABLE(
    AI_RUN_AGENT('fraud_investigator', t.transaction_data, t.transaction_id)
  ) AS a(status, response)
WHERE
  t.risk_score > 0.7;

-- Third stage: Final decision
INSERT INTO fraud_decisions
SELECT
  t.transaction_id,
  t.customer_id,
  t.amount,
  t.risk_score,
  a.response AS final_decision,
  JSON_EXTRACT(a.response, '$.action') AS action,
  JSON_EXTRACT(a.response, '$.confidence') AS confidence
FROM
  investigated_transactions AS t,
  LATERAL TABLE(
    AI_RUN_AGENT('fraud_decision', t.transaction_data, t.transaction_id)
  ) AS a(status, response);

IoT predictive maintenance

Build an agent system for predictive maintenance of IoT devices.

Step 1: Create data processing pipeline

-- Process sensor data
CREATE VIEW processed_sensor_data AS
SELECT
  device_id,
  sensor_type,
  value,
  timestamp,
  LAG(value) OVER (PARTITION BY device_id, sensor_type ORDER BY timestamp) as prev_value,
  value - LAG(value) OVER (PARTITION BY device_id, sensor_type ORDER BY timestamp) as value_change
FROM sensor_readings;

-- Aggregate data for analysis
CREATE VIEW device_health_summary AS
SELECT
  device_id,
  COUNT(*) as reading_count,
  AVG(value) as avg_value,
  STDDEV(value) as value_volatility,
  MAX(value) as max_value,
  MIN(value) as min_value,
  MAX(timestamp) as last_reading
FROM processed_sensor_data
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY device_id;

Step 2: Create maintenance agent

In this step, you create a maintenance agent that uses the model and the tools you created in the previous step.

CREATE AGENT maintenance_agent
USING MODEL iot_llm
USING PROMPT 'You are an IoT maintenance expert. Analyze device health data and predict maintenance needs. Use the available tools to check device specifications and maintenance history.'
USING TOOLS device_specs, maintenance_history, parts_inventory
WITH (
  'max_iterations' = '6'
);

Step 3: Execute maintenance analysis

In this step, you run the maintenance agent on the device health summary table and insert the results into the maintenance_recommendations table.

INSERT INTO maintenance_recommendations
SELECT
  t.device_id,
  t.reading_count,
  t.avg_value,
  t.value_volatility,
  a.response AS maintenance_analysis,
  JSON_EXTRACT(a.response, '$.recommendation') AS recommendation,
  JSON_EXTRACT(a.response, '$.urgency') AS urgency,
  JSON_EXTRACT(a.response, '$.parts_needed') AS parts_needed
FROM
  device_health_summary AS t,
  LATERAL TABLE(
    AI_RUN_AGENT('maintenance_agent', t.device_data, t.device_id)
  ) AS a(status, response);

Content moderation system

Build a content moderation system with multiple specialized agents.

Step 1: Create content analysis agents

In this step, you create content analysis agents that use the model and the tools you created in the previous step.

-- Text content analyzer
CREATE AGENT text_moderator
USING MODEL content_llm
USING PROMPT 'You are a content moderation expert specializing in text analysis. Identify inappropriate content, hate speech, and policy violations.'
USING TOOLS text_analyzer, policy_checker
WITH (
  'max_iterations' = '4'
);

-- Image content analyzer
CREATE AGENT image_moderator
USING MODEL vision_llm
USING PROMPT 'You are a content moderation expert specializing in image analysis. Identify inappropriate images, violence, and policy violations.'
USING TOOLS image_analyzer, content_classifier
WITH (
  'max_iterations' = '4'
);

-- Decision maker
CREATE AGENT moderation_decision
USING MODEL content_llm
USING PROMPT 'You are a content moderation decision maker. Based on analysis results, decide whether to approve, flag, or reject content.'
USING TOOLS content_action, user_notification
WITH (
  'max_iterations' = '3'
);

Step 2: Create Content Processing Pipeline

In this step, you create content processing pipeline that uses the agents you created in the previous step.

-- Process text content
CREATE VIEW text_analysis AS
SELECT
  t.content_id,
  t.user_id,
  t.content_type,
  t.text_content,
  a.response AS text_analysis_result
FROM
  user_content AS t,
  LATERAL TABLE(
     AI_RUN_AGENT('text_moderator', t.text_content, t.content_id)
  ) AS a(status, response)
WHERE
  t.content_type = 'text';

-- Process image content
CREATE VIEW image_analysis AS
SELECT
  t.content_id,
  t.user_id,
  t.content_type,
  t.image_url,
  a.response AS image_analysis_result
FROM
  user_content AS t,
  LATERAL TABLE(
    AI_RUN_AGENT('image_moderator', t.image_url, t.content_id)
  ) AS a(status, response)
 WHERE
   t.content_type = 'image';

-- Combine analysis results
CREATE VIEW combined_analysis AS
SELECT
  content_id,
  user_id,
  content_type,
  COALESCE(text_analysis_result, image_analysis_result) as analysis_result
FROM text_analysis
FULL OUTER JOIN image_analysis USING (content_id);

-- Make final decisions
INSERT INTO moderation_decisions
SELECT
  t.content_id,
  t.user_id,
  t.content_type,
  a.agent_output AS final_decision,
  JSON_EXTRACT(a.response, '$.action') AS action,
  JSON_EXTRACT(a.response, '$.reason') AS reason,
  JSON_EXTRACT(a.response, '$.confidence') AS confidence
FROM
  combined_analysis AS t,
  LATERAL TABLE(
    AI_RUN_AGENT('moderation_decision', t.analysis_result, t.content_id)
  ) AS a(status, response);

Multi-agent chat system

Build a conversational AI system with multiple specialized agents.

Step 1: Create specialized agents

In this step, you create specialized agents that use the model and the tools you created in the previous step.

-- General conversation agent
CREATE AGENT conversation_agent
USING MODEL chat_llm
USING PROMPT 'You are a helpful conversational AI. Engage in natural conversation and route complex queries to appropriate specialists.'
USING TOOLS query_classifier, agent_router
WITH (
  'max_iterations' = '3'
);

-- Technical support agent
CREATE AGENT tech_support
USING MODEL tech_llm
USING PROMPT 'You are a technical support specialist. Help users with technical issues and provide detailed solutions.'
USING TOOLS knowledge_base, system_diagnostics, ticket_creator
WITH (
  'max_iterations' = '6'
);

-- Sales agent
CREATE AGENT sales_agent
USING MODEL sales_llm
USING PROMPT 'You are a sales specialist. Help customers find the right products and services for their needs.'
USING TOOLS product_catalog, pricing_calculator, quote_generator
WITH (
  'max_iterations' = '5'
);

Step 2: Create conversation management

In this step, you create conversation management table that uses the agents you created in the previous step.

-- Conversation state table
CREATE TABLE conversation_state (
  session_id STRING,
  user_id STRING,
  current_agent STRING,
  conversation_history STRING,
  last_activity TIMESTAMP(3),
  PRIMARY KEY (session_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'conversation-state',
  'properties.bootstrap.servers' = 'your-kafka-bootstrap-servers',
  'key.format' = 'json',
  'value.format' = 'json'
);

-- Process incoming messages
INSERT INTO conversation_state
SELECT
  t.session_id,
  t.user_id,
  CASE
    WHEN JSON_EXTRACT(a.response, '$.specialist') = 'tech_support' THEN 'tech_support'
    WHEN JSON_EXTRACT(a.response, '$.specialist') = 'sales' THEN 'sales_agent'
    ELSE 'conversation_agent'
  END AS current_agent,
  CONCAT(t.conversation_history, '\nUser: ', t.message, '\nAgent: ', a.agent_output) AS conversation_history,
  CURRENT_TIMESTAMP AS last_activity
FROM
  user_messages AS t,
  LATERAL TABLE(
    AI_RUN_AGENT('conversation_agent', t.message, t.session_id)
  ) AS a(status, response);

Step 3: Route to specialized agents

In this step, you route to specialized agents that use the agents you created in the previous step.

-- Route to technical support
INSERT INTO tech_support_responses
SELECT
  session_id,
  user_id,
  message,
  agent_output as response
FROM
user_messages AS t,
LATERAL TABLE(
    AI_RUN_AGENT('tech_support', t.message, t.session_id)
  ) AS a(status, response)
WHERE t.session_id IN (
  SELECT session_id
  FROM conversation_state
  WHERE current_agent = 'tech_support'
) AND a.status = 'SUCCESS';

-- Route to sales agent
INSERT INTO sales_responses
SELECT
  t.session_id,
  t.user_id,
  t.message,
  a.response AS response
FROM
  user_messages AS t,
  LATERAL TABLE(
    AI_RUN_AGENT('sales_agent', t.message, t.session_id)
  ) AS a(status, response)
WHERE
  t.session_id IN (
    SELECT session_id
    FROM conversation_state
    WHERE current_agent = 'sales_agent'
  );

Agent chaining

Chain multiple agents together for complex workflows:

-- First agent: Classify request
CREATE VIEW classified_requests AS
SELECT
  t.request_id,
  t.request_text,
  a.response AS classification
FROM
  user_requests AS t,
  LATERAL TABLE(
    AI_RUN_AGENT('request_classifier', t.request_text, t.request_id)
  ) AS a(status, response);

-- Second agent: Process based on classification
CREATE VIEW processed_requests AS
SELECT
  t.request_id,
  t.request_text,
  t.classification,
  a.response AS response
FROM
  classified_requests AS t,
  LATERAL TABLE(
    AI_RUN_AGENT('request_processor', t.request_data, t.request_id)
  ) AS a(status, response);

Conditional agent selection

The following example code selects different agents based on conditions.

CREATE VIEW dynamic_agent_responses AS
SELECT
  t.request_id,
  t.request_type,
  COALESCE(tech.response, sales.response, general.response) AS response
FROM
  user_requests AS t
LEFT JOIN LATERAL TABLE(
  AI_RUN_AGENT('tech_agent', t.request_text, t.request_id)
) AS tech(status, response)
  ON t.request_type = 'technical'
LEFT JOIN LATERAL TABLE(
  AI_RUN_AGENT('sales_agent', t.request_text, t.request_id)
) AS sales(status, response)
  ON t.request_type = 'sales'
LEFT JOIN LATERAL TABLE(
  AI_RUN_AGENT('general_agent', t.request_text, t.request_id)
) AS general(status, response)
  ON t.request_type NOT IN ('technical', 'sales');

Error handling and fallbacks

The following example code implements robust error handling.

CREATE AGENT robust_agent
USING MODEL main_llm
USING PROMPT 'You are a robust agent that handles errors gracefully. If a tool fails, try alternative approaches or ask for clarification.'
USING TOOLS primary_tool, fallback_tool, error_handler
WITH (
  'max_iterations' = '10',
  'request_timeout' = '600',
  'max_consecutive_failures' = '3',
  'handle_exception' = 'continue'
);

Best practices

The following are best practices for designing and implementing agents.

Agent design
  • Keep agents focused on specific tasks
  • Use clear and descriptive prompts
  • Limit the number of tools per agent
  • Implement proper error handling
Performance optimization
  • Use function-based tools for high-frequency operations
  • Implement caching where appropriate
  • Monitor execution times and costs
  • Use appropriate parallelism
Security
  • Store API keys securely
  • Validate all inputs
  • Implement proper access controls
  • Monitor for security issues
Monitoring
  • Set up comprehensive logging
  • Monitor agent performance
  • Track tool usage and costs
  • Implement alerting for failures