Monitor Streaming Agents with Confluent Cloud

When you run a streaming agent, you can capture detailed execution logs in a system log table. A system log table is a table that you create to store every step of an agent’s workflow, including chat messages sent to the model, model responses, tool call requests, tool responses, and errors. Pass a system log table name to AI_RUN_AGENT, and Confluent Cloud automatically streams the logs to that table.

Note

Monitoring streaming agents with system log tables is an Open Preview feature in Confluent Cloud.

A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.

Prerequisites

  • AI tools and agents already created in your environment.

  • A running Flink environment in Confluent Cloud.

Create a system log table

Create a table to capture agent workflow logs. The agent runtime writes logs using a fixed schema that maps workflow metadata, event types, and payloads to specific columns.

CREATE TABLE `agent_system_table` (
  `agent_name` STRING NOT NULL,
  `job_id` STRING NOT NULL,
  `request_id` STRING NOT NULL,
  `statement_name` STRING NOT NULL,
  `iteration` INT NOT NULL,
  `type` STRING NOT NULL,
  `data` STRING NOT NULL,
  `metrics` STRING
)
WITH (
  'value.format' = 'avro-registry'
);

The default value.format is avro-registry. You can also use json-registry or proto-registry.

The table must use the required schema, and custom columns are not supported. You can configure topic settings, retention policies, and other table properties as needed.

Schema reference

Each column in the system log table schema serves a specific purpose:

Column

Type

Description

agent_name

STRING

Name of the agent.

job_id

STRING

Unique identifier for the Flink job running the agent.

request_id

STRING

Unique identifier for the prompt or request being processed.

statement_name

STRING

Name of the Flink SQL statement.

iteration

INT

Current step or loop iteration of the agent’s reasoning process.

type

STRING

Log event type, for example, CHAT_MESSAGES or MODEL_RESPONSE.

data

STRING

Event payload as a JSON string.

metrics

STRING

Optional latency metrics. This column is nullable.

Log event types

The type column contains one of these event type values:

CHAT_MESSAGES

The chat messages sent to the model.

MODEL_RESPONSE

The model’s response.

TOOL_CALL_REQUEST

The request sent to a tool.

TOOL_RESPONSE

The response returned by a tool.

ERROR

An error that occurred during agent execution.

Run an agent with logging enabled

To enable system table logging, pass the name of a system log table as the fourth argument to AI_RUN_AGENT. As the agent processes records, it automatically emits logs to the system log table.

SELECT `prompt`, `status`, `response`
FROM `customer_queries`,
LATERAL TABLE(
  AI_RUN_AGENT(
    'agent_1', `prompt`, `id`,
    'agent_system_table'));

Query agent logs

Query agent logs in real time using standard Flink SQL. Confluent Cloud writes logs to the system log table, so you can use any SQL query pattern.

View all logs:

SELECT * FROM `agent_system_table`;

Filter by request ID and order by iteration:

SELECT `request_id`, `iteration`, `type`, `data`
FROM `agent_system_table`
WHERE `request_id` = '1'
ORDER BY `iteration` ASC;

Filter by type to view only tool responses:

SELECT * FROM `agent_system_table`
WHERE `type` = 'TOOL_RESPONSE';

Filter for errors:

SELECT `request_id`, `iteration`, `data`
FROM `agent_system_table`
WHERE `type` = 'ERROR';

For comprehensive debugging workflows, refer to Debug Streaming Agents.

You can also consume logs directly from the underlying Kafka topic using any Kafka consumer for downstream monitoring, alerting, or storage.