documentation
Get Started Free
  • Get Started Free
  • Stream
      Confluent Cloud

      Fully-managed data streaming platform with a cloud-native Kafka engine (KORA) for elastic scaling, with enterprise security, stream processing, governance.

      Confluent Platform

      An on-premises enterprise-grade distribution of Apache Kafka with enterprise security, stream processing, governance.

  • Connect
      Managed

      Use fully-managed connectors with Confluent Cloud to connect to data sources and sinks.

      Self-Managed

      Use self-managed connectors with Confluent Platform to connect to data sources and sinks.

  • Govern
      Managed

      Use fully-managed Schema Registry and Stream Governance with Confluent Cloud.

      Self-Managed

      Use self-managed Schema Registry and Stream Governance with Confluent Platform.

  • Process
      Managed

      Use Flink on Confluent Cloud to run complex, stateful, low-latency streaming applications.

      Self-Managed

      Use Flink on Confluent Platform to run complex, stateful, low-latency streaming applications.

Stream
Confluent Cloud

Fully-managed data streaming platform with a cloud-native Kafka engine (KORA) for elastic scaling, with enterprise security, stream processing, governance.

Confluent Platform

An on-premises enterprise-grade distribution of Apache Kafka with enterprise security, stream processing, governance.

Connect
Managed

Use fully-managed connectors with Confluent Cloud to connect to data sources and sinks.

Self-Managed

Use self-managed connectors with Confluent Platform to connect to data sources and sinks.

Govern
Managed

Use fully-managed Schema Registry and Stream Governance with Confluent Cloud.

Self-Managed

Use self-managed Schema Registry and Stream Governance with Confluent Platform.

Process
Managed

Use Flink on Confluent Cloud to run complex, stateful, low-latency streaming applications.

Self-Managed

Use Flink on Confluent Platform to run complex, stateful, low-latency streaming applications.

Learn
Get Started Free
  1. Home
  2. Flink Jobs
  3. Flink Applications
  • Overview
  • Installation and Upgrade
    • Overview
    • Versions and Interoperability
    • Install with Helm
    • Configure Authentication
    • Configure Authorization
    • Configure Storage
    • Configure Encryption
    • Upgrade
  • Get Started
    • Overview
    • Get Started with Applications
    • Get Started with Statements
  • Architecture and Features
    • Overview
    • Understand Flink
    • Confluent Manager for Apache Flink
  • Configure Environments, Catalogs and Compute Pools
    • Overview
    • Manage Environments
    • Manage Catalogs and Databases
    • Manage Compute Pools
    • Configure Access Control
  • Deploy and Manage Flink Jobs
    • Overview
    • Applications
      • Overview
      • Create Applications
      • Manage Applications
      • Application Instances
      • Events
      • Package Flink Jobs
      • Package PyFlink Jobs
      • Run Flink Agents
      • Supported Features
    • SQL Statements
      • Overview
      • Create Statements
      • Manage Statements
      • Use Interactive Shell
      • Forecast
      • Anomaly Detection
      • Features and Support
    • Manage Savepoints
    • Job Configuration
      • Overview
      • Checkpointing
      • Logging
      • Metrics
      • Security
  • Manage Flink Applications with CFK
  • Manage Flink Applications in Confluent Control Center
  • Disaster Recovery
  • Clients and APIs
    • Overview
    • Use REST APIs
    • Use CLI Operations
    • Use Confluent for Kubernetes
    • Use Control Center with Confluent Manager for Apache Flink
  • How-to Guides
    • Overview
    • Checkpoint to S3
  • FAQ
  • Get Help
  • What’s New

Package a PyFlink job for Confluent Manager for Apache Flink

Flink jobs are deployed in Confluent Platform with Confluent Manager for Apache Flink® (CMF), which is a central management component of Confluent Platform for Apache Flink. This topic walks you through configuring your PyFlink project for packaging with CMF.

Prerequisites

Before you package a PyFlink job, you must meet the following prerequisites:

  • Confluent Manager for Apache Flink installed using Helm. For installation instructions, see Install Confluent Manager for Apache Flink with Helm.

  • A PyFlink project with Python dependencies managed by a package manager such as pip.

  • Docker installed.

Set up the project configuration

Follow these steps to set up your PyFlink project configuration.

  1. Create a Python file for your PyFlink application. Following is an example python_demo.py:

    import logging
    import sys
    
    from pyflink.table import TableEnvironment, EnvironmentSettings
    
    
    def python_demo():
        t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
        t_env.execute_sql(
            """
        CREATE TABLE orders (
          order_number BIGINT,
          price        DECIMAL(32,2),
          buyer        ROW<first_name STRING, last_name STRING>,
          order_time   TIMESTAMP(3)
        ) WITH (
          'connector' = 'datagen'
        )"""
        )
    
        t_env.execute_sql(
            """
            CREATE TABLE print_table WITH ('connector' = 'print')
              LIKE orders"""
        )
        t_env.execute_sql(
            """
            INSERT INTO print_table SELECT * FROM orders"""
        )
    
    
    if __name__ == "__main__":
        logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
        python_demo()
    
  2. Create a Python dependencies file. You can create a requirements.txt file or use a modern Python package manager like uv to manage your PyFlink dependencies:

    cp-pyflink>2.0
    

Deploy your PyFlink application with CMF

After you have developed your PyFlink application locally, you need to package it for deployment. Following are the recommended packaging options.

Package with a custom docker image

You should package the PyFlink job with a custom Docker image that contains the Python environment and dependencies. You must already have the infrastructure in place for building Docker images in a build pipeline. You base the custom Docker image on the confluentinc/cp-flink:2.0.0-cp1 image found on Docker Hub.

  1. Create the Dockerfile. Following is an example Dockerfile:

    FROM debian:latest AS builder
    
    # Install dependencies first
    RUN set -ex; \
      apt-get update; \
      apt-get -y install gcc default-jdk; \
      rm -rf /var/lib/apt/lists/*
    
    # Set the correct JAVA_HOME
    ENV JAVA_HOME=/usr/lib/jvm/default-java
    
    # Setup python environment with uv
    COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/
    
    # Install python in the app folder so we can port it over
    ENV UV_PYTHON_INSTALL_DIR=/opt/flink/pyflink/.uv
    
    # Create the virtual environment
    RUN mkdir -p /opt/flink/pyflink
    WORKDIR /opt/flink/pyflink
    RUN uv venv --python 3.11 .venv
    
    # Copy the project and install dependencies. This demo only requires cp-pyflink,
    # but you can install dependencies the way you prefer. We suggest building a project
    # that can be setup by running `uv sync`.
    # Make sure to install the correct version of cp-pyflink
    COPY python_demo.py ./
    RUN uv pip install cp-pyflink>2.0
    
    # Build the final image, copy the python project and virtualenv in /opt/flink/pyflink
    FROM confluentinc/cp-flink:2.0.0-cp1
    
    COPY --from=builder --chown=flink:flink /opt/flink/pyflink/ /opt/flink/pyflink/
    
  2. Build the docker image. The following shows an example command to do this:

    docker build -t pyflink-test:latest .
    
  3. Define the Flink application like shown in the following example:

    apiVersion: cmf.confluent.io/v1
    kind: FlinkApplication
    metadata:
      name: python-example
    spec:
      image: pyflink-test:latest
      flinkVersion: v2_0
      flinkConfiguration:
        metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
        metrics.reporter.prom.port: 9249-9250
        taskmanager.numberOfTaskSlots: "1"
      serviceAccount: flink
      jobManager:
        resource:
          cpu: 1
          memory: 1024m
      taskManager:
        resource:
          cpu: 1
          memory: 1024m
      job:
        jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.1.jar
        state: running
        parallelism: 1
        upgradeMode: stateless
        entryClass: org.apache.flink.client.python.PythonDriver
        args:
          - -pyclientexec
          - /opt/flink/pyflink/.venv/bin/python3
          - -py
          - /opt/flink/pyflink/python_demo.py
    

Submit the application definition

To run your PyFlink application with CMF you need to make the Python environment and dependencies available to the Flink clusters. After you package the application, you can use the Confluent CLI to submit your Flink application definition.

For example:

confluent flink application create --environment <env-name> <application-definition>

Important notes

Note the following about deploying PyFlink applications with CMF:

  • Python Version Compatibility: Ensure your Python version is compatible with the PyFlink version you’re using. See Versions and Interoperability for Confluent Manager for Apache Flink for more information.

  • Flink Python JAR: The jarURI should point to the Flink Python JAR file that matches your Flink version. The path is typically /opt/flink/opt/flink-python_<version>.jar.

  • Entry Class: PyFlink applications use org.apache.flink.client.python.PythonDriver as the entry class.

  • Python Executor: The -pyclientexec argument should point to the Python executable in your container or its virtual environment.

  • Python Script Path: The -py argument should point to the path of your Python script within the container.

  • Resource Requirements: Adjust the CPU and memory requirements based on your application’s needs.

  • Dependencies: Make sure all required Python packages are installed in your container.

Troubleshooting

Following are some troubleshooting tips if you encounter issues when deploying PyFlink applications with CMF:

  • Python Environment Issues: Ensure the Python virtual environment is properly set up and all dependencies are installed.

  • JAR File Path: Verify that the Flink Python JAR file exists at the specified path in your container.

  • Permissions: Make sure the flink user has read access to the Python script and dependencies.

  • Logs: Check the Flink job manager and task manager logs for Python-related errors.

Related content

  • Flink Jobs for Confluent Manager for Apache Flink

Was this doc page helpful?

Give us feedback

Do you still need help?

Confluent support portalAsk the community
Thank you. We'll be in touch!
Be the first to get updates and new content

By clicking "SIGN UP" you agree that your personal data will be processed in accordance with our Privacy Policy.

  • Confluent
  • About
  • Careers
  • Contact
  • Professional Services
  • Product
  • Confluent Cloud
  • Confluent Platform
  • Connectors
  • Flink
  • Stream Governance
  • Developer
  • Free Courses
  • Tutorials
  • Event Streaming Patterns
  • Documentation
  • Blog
  • Podcast
  • Community
  • Forum
  • Meetups
  • Kafka Summit
  • Catalysts
Terms & ConditionsPrivacy PolicyDo Not Sell My InformationModern Slavery PolicyCookie SettingsFeedback

Copyright © Confluent, Inc. 2014- Apache®️, Apache Kafka®️, Kafka®️, Apache Flink®️, Flink®️, Apache Iceberg®️, Iceberg®️ and associated open source project names are trademarks of the Apache Software Foundation

On this page:
  • Prerequisites
  • Set up the project configuration
  • Deploy your PyFlink application with CMF
  • Package with a custom docker image
  • Submit the application definition
  • Important notes
  • Troubleshooting
  • Related content