Run Apache Flink Agents on Confluent Platform for Apache Flink

Apache Flink® Agents is an open-source library for building AI agent applications that run as Apache Flink jobs. This library enables you to create agentic workflows, such as ReAct agents and tool-calling pipelines, which execute as scalable, fault-tolerant Apache Flink streaming applications.

You can run Apache Flink Agents applications on Confluent Platform for Apache Flink by packaging them as custom Docker images and submitting them through Confluent Manager for Apache Flink (CMF), following the same pattern as any Flink application.

Important

Apache Flink Agents is an early-stage open-source project provided by the Apache Flink community. It is not a Confluent product and is not supported by Confluent. For issues with the Apache Flink Agents library, see the Apache Flink Agents documentation and the flink-agents GitHub repository.

For examples of how to use Apache Flink Agents, see the examples directory in the Apache Flink Agents repository.

Prerequisites

Build a custom Docker image

Apache Flink Agents applications require the Apache Flink Agents distribution JAR and your application JAR to be available on the Flink classpath. Package these into a custom Docker image based on the Confluent Flink base image.

Apache Flink Agents provides version-specific distribution JARs (for example, flink-2.1). Choose the distribution that matches your Confluent Platform for Apache Flink version.

Example Dockerfile:

FROM confluentinc/cp-flink:2.1.1-cp1

# Flink Agents distribution JAR (uber JAR with all agent dependencies)
COPY flink-agents/dist/flink-2.1/target/flink-agents-dist-flink-2.1-0.2-SNAPSHOT.jar /opt/flink/usrlib/

# Your application JAR
COPY target/my-agent-app.jar /opt/flink/usrlib/

Note

Place JARs in /opt/flink/usrlib/ so they are available to both the JobManager and TaskManager pods. If your application requires additional connectors or data files, add them to the image in the same way.

Build the image and make it available to your Kubernetes cluster:

docker build -t my-flink-agents-app:latest .

Deploy the application with CMF

Create a FlinkApplication specification for your Apache Flink Agents application.

{
  "apiVersion": "cmf.confluent.io/v1",
  "kind": "FlinkApplication",
  "metadata": {
    "name": "my-agent-app"
  },
  "spec": {
    "image": "my-flink-agents-app:latest",
    "flinkVersion": "v2_1",
    "flinkConfiguration": {
      "taskmanager.numberOfTaskSlots": "1"
    },
    "serviceAccount": "flink",
    "jobManager": {
      "resource": {
        "memory": "1024m",
        "cpu": 1
      }
    },
    "taskManager": {
      "resource": {
        "memory": "1024m",
        "cpu": 1
      }
    },
    "job": {
      "jarURI": "local:///opt/flink/usrlib/my-agent-app.jar",
      "entryClass": "com.example.MyAgentApp",
      "state": "running",
      "parallelism": 1,
      "upgradeMode": "stateless"
    }
  }
}
apiVersion: cmf.confluent.io/v1
kind: FlinkApplication
metadata:
  name: my-agent-app
spec:
  image: my-flink-agents-app:latest
  flinkVersion: v2_1
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    resource:
      memory: 1024m
      cpu: 1
  taskManager:
    resource:
      memory: 1024m
      cpu: 1
  job:
    jarURI: local:///opt/flink/usrlib/my-agent-app.jar
    entryClass: com.example.MyAgentApp
    state: running
    parallelism: 1
    upgradeMode: stateless

If your application requires environment variables, such as API keys for Large Language Model (LLM) providers, pass them through the podTemplate:

{
  "spec": {
    "podTemplate": {
      "spec": {
        "containers": [
          {
            "name": "flink-main-container",
            "env": [
              {
                "name": "OPENAI_API_KEY",
                "valueFrom": {
                  "secretKeyRef": {
                    "name": "llm-api-keys",
                    "key": "openai-key"
                  }
                }
              }
            ]
          }
        ]
      }
    }
  }
}

Tip

Use Kubernetes Secrets to manage sensitive values like API keys rather than passing them as plain text in the application specification.

Submit the application using the Confluent CLI or REST API:

confluent flink application create \
  --environment <env-name> my-agent-app.json
curl -X POST http://<cmf-host>:8080/cmf/api/v1/environments/<env-name>/applications \
  -H "Content-Type: application/json" \
  -d @my-agent-app.json

For more information on creating and managing applications, see Create Confluent Manager for Apache Flink Applications and Describe, Update and Delete Confluent Manager for Apache Flink Applications.