Monitor Streaming Agents with Confluent Cloud
When you run a streaming agent, you can capture detailed execution logs in a system log table. A system log table is a table that you create to store every step of an agent’s workflow, including chat messages sent to the model, model responses, tool call requests, tool responses, and errors. Pass a system log table name to AI_RUN_AGENT, and Confluent Cloud automatically streams the logs to that table.
Note
Monitoring streaming agents with system log tables is an Open Preview feature in Confluent Cloud.
A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.
Prerequisites
AI tools and agents already created in your environment.
A running Flink environment in Confluent Cloud.
Create a system log table
Create a table to capture agent workflow logs. The agent runtime writes logs using a fixed schema that maps workflow metadata, event types, and payloads to specific columns.
CREATE TABLE `agent_system_table` (
`agent_name` STRING NOT NULL,
`job_id` STRING NOT NULL,
`request_id` STRING NOT NULL,
`statement_name` STRING NOT NULL,
`iteration` INT NOT NULL,
`type` STRING NOT NULL,
`data` STRING NOT NULL,
`metrics` STRING
)
WITH (
'value.format' = 'avro-registry'
);
The default value.format is avro-registry. You can also use json-registry or proto-registry.
The table must use the required schema, and custom columns are not supported. You can configure topic settings, retention policies, and other table properties as needed.
Schema reference
Each column in the system log table schema serves a specific purpose:
Column | Type | Description |
|---|---|---|
| STRING | Name of the agent. |
| STRING | Unique identifier for the Flink job running the agent. |
| STRING | Unique identifier for the prompt or request being processed. |
| STRING | Name of the Flink SQL statement. |
| INT | Current step or loop iteration of the agent’s reasoning process. |
| STRING | Log event type, for example, |
| STRING | Event payload as a JSON string. |
| STRING | Optional latency metrics. This column is nullable. |
Log event types
The type column contains one of these event type values:
CHAT_MESSAGESThe chat messages sent to the model.
MODEL_RESPONSEThe model’s response.
TOOL_CALL_REQUESTThe request sent to a tool.
TOOL_RESPONSEThe response returned by a tool.
ERRORAn error that occurred during agent execution.
Run an agent with logging enabled
To enable system table logging, pass the name of a system log table as the fourth argument to AI_RUN_AGENT. As the agent processes records, it automatically emits logs to the system log table.
SELECT `prompt`, `status`, `response`
FROM `customer_queries`,
LATERAL TABLE(
AI_RUN_AGENT(
'agent_1', `prompt`, `id`,
'agent_system_table'));
Query agent logs
Query agent logs in real time using standard Flink SQL. Confluent Cloud writes logs to the system log table, so you can use any SQL query pattern.
View all logs:
SELECT * FROM `agent_system_table`;
Filter by request ID and order by iteration:
SELECT `request_id`, `iteration`, `type`, `data`
FROM `agent_system_table`
WHERE `request_id` = '1'
ORDER BY `iteration` ASC;
Filter by type to view only tool responses:
SELECT * FROM `agent_system_table`
WHERE `type` = 'TOOL_RESPONSE';
Filter for errors:
SELECT `request_id`, `iteration`, `data`
FROM `agent_system_table`
WHERE `type` = 'ERROR';
For comprehensive debugging workflows, refer to Debug Streaming Agents.
You can also consume logs directly from the underlying Kafka topic using any Kafka consumer for downstream monitoring, alerting, or storage.