Invoke a Tool in an AI Workflow¶
The AI_TOOL_INVOKE function in Confluent Cloud for Apache Flink® enables dynamic tool invocation within an AI workflow. It enables you to call external tools by using a Model Context Protocol (MCP) server or local tools by using user-defined functions (UDFs).
This capability is especially useful when a large language model (LLM) determines that it needs to delegate part of its task to another system, like performing a database lookup, calling a third-party API, or triggering an operational action.
Note
The AI_TOOL_INVOKE function is available for preview.
A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.
For help with the AI_TOOL_INVOKE function, contact streamingagents-early-access@confluent.io.
This guide shows how to use the AI_TOOL_INVOKE function to invoke tools provided by an MCP server. You can also use the function to use local tools in a UDF. The process is similar but requires you to create UDFs for the tools you want to use.
- Step 1: Create provider connections: Create provider connections to the MCP server and the AI model.
- Step 2: Create a model: Create a model that uses the connections you created in the previous step.
- Step 3: Run the AI model with the MCP tools: Use the AI_TOOL_INVOKE function in a SQL query to pass an input prompt to the model, along with descriptors for UDFs and tools you want to make available. The function returns a map containing the results from the tools, their responses, and the status of each call.
Prerequisites¶
- Access to Confluent Cloud.
- Access to a Flink compute pool.
- Sufficient permissions to create models. For more information, see RBAC for model inference.
- A remote MCP server with an SSE endpoint. This guide uses the free MCP server at https://mcp.deepwiki.com/sse.
- A remote AI model. This guide assumes you have access to a model running on Azure OpenAI.
Step 1: Create provider connections¶
In this step, you use Flink SQL to create connections to the MCP server and the AI model. For more information, see CREATE CONNECTION.
In the Flink SQL shell, or in a Flink workspace in Confluent Cloud Console, run the following CREATE CONNECTION statement to create a connection to the MCP server.
CREATE CONNECTION deepwiki-mcp-connection WITH ( 'type' = 'mcp_server', 'endpoint' = 'https://mcp.deepwiki.com/sse', 'api-key' = 'api_key' );
Tip
Find the tools that are offered by an MCP server by running the MCP Inspector tool on your local machine.
npx @modelcontextprotocol/inspector@latest https://mcp.deepwiki.com/sse
Connect to the server and click List Tools to see the tools that the server offers. This guide uses the following tools:
read_wiki_structure
: Get a list of documentation topics for a GitHub repositoryread_wiki_contents
: View documentation about a GitHub repository.ask_question
: Ask any question about a GitHub repository.
Run the following statement to create a connection to the AI model.
# Create a connection to the AI model. CREATE CONNECTION azure-openai-connection WITH ( 'type' = 'azureopenai', 'endpoint' = 'https://<your-model>.openai.azure.com/openai/deployments/<your-deployment-id>/chat/completions?api-version=2025-01-01-preview', 'api-key' = '<your-api-key>' );
Step 2: Create a model¶
In this step, you create a model that uses the AI model connection and the MCP connection you created in the previous step.
In the Flink SQL shell, or in a Flink workspace in Confluent Cloud Console, run the following CREATE MODEL statement to register your model.
- The
system_prompt
property directs the remote model to select the best MCP tools to complete the task. - The
mcp.connection
property specifies the MCP server connection to use.
CREATE MODEL `azureopenai_mcp_model` INPUT (prompt STRING) OUTPUT (response STRING) WITH ( 'provider' = 'azureopenai', 'task' = 'text_generation', 'azureopenai.system_prompt' = 'Use the best tool to respond to the input prompt', 'azureopenai.connection' = 'azure-openai-connection', 'mcp.connection' = 'deepwiki-mcp-connection' );
Tip
Try the following system prompts to see how the model responds:
- ‘When a user request is received, analyze the input and, in one step, prepare all necessary tool calls. Output ONLY the structured tool call information for each relevant tool.’
- ‘When a user request is received, prepare and output all tool calls that need to be made as a direct response to the input. This output should contain only the precise instruction set for all relevant tools to be executed simultaneously.’
- ‘When given any user request, output the structured tool call details for all identified relevant tools Return all tool call requests together as a single, well-defined response.’
- The
Run the following statement to create a table that contains the input prompts.
CREATE TABLE text_stream ( id BIGINT, prompt STRING );
Run the following statement to insert prompts into the table.
INSERT INTO text_stream VALUES (1, 'What is the structure of the confluentinc/flink-cookbook repo?'), (2, 'Please get the docs for the confluentinc/flink-cookbook repo'), (3, 'ask_question tool: Does the confluentinc/flink-cookbook repo have examples for queries with tumbling windows?');
Step 3: Run the AI model with the MCP tools¶
In this step, you run the AI model with the tools provided by the MCP server.
Run the following statement to invoke your model.
SELECT prompt, AI_TOOL_INVOKE( `azureopenai_mcp_model`, prompt, MAP[], MAP['read_wiki_structure', 'Get a list of documentation topics for a GitHub repository', 'read_wiki_contents', 'View documentation about a GitHub repository', 'ask_question', 'Ask any question about a GitHub repository']) AS response FROM text_stream;
For the first prompt, your output should resemble:
read_wiki_structure,SUCCESS,Available pages for confluentinc/flink-cookbook: - 1 Flink Cookbook Overview - 1.1 Architecture and Build Patterns - 1.2 Getting Started - 2 Kafka Integration Recipes - 2.1 Dead Letter Queue Pattern - 2.2 Exactly-Once Processing ...
For the second prompt, your output should resemble:
read_wiki_contents,SUCCESS,# Page: Flink Cookbook Overview # Flink Cookbook Overview <details> <summary>Relevant source files</summary> The following files were used as context for generating this wiki page: - [compiled-plan/pom.xml](compiled-plan/pom.xml) - [kafka-dead-letter/pom.xml](kafka-dead-letter/pom.xml) - [kafka-exactly-once/pom.xml](kafka-exactly-once/pom.xml) - [kafka-headers/pom.xml](kafka-headers/pom.xml) - [pattern-matching-cep/pom.xml](pattern-matching-cep/pom.xml) </details> This document provides an overview of the Flink Cookbook repository, which contains a collection of Apache Flink recipes demonstrating stream processing patterns, integrations, and best practices. Each recipe is implemented as a standalone Maven project with complete examples, tests, and documentation. ...
For the third prompt, your output should resemble:
ask_question,SUCCESS,Yes, the repository `confluentinc/flink-cookbook` contains an example of a query using tumbling windows. Specifically, the `kotlin` recipe demonstrates the use of `TumblingEventTimeWindows` for counting words within 5-second intervals. ### Tumbling Window Example The `WordCount.kt` file in the `kotlin` recipe defines a Flink application that processes text lines from a Kafka topic and counts words within tumbling windows. ...