Package a PyFlink job for Confluent Manager for Apache Flink¶
This topic walks you through configuring your PyFlink project for packaging with Confluent Manager for Apache Flink (CMF).
Prerequisites¶
Before you package a PyFlink job, you must meet the following prerequisites:
- Confluent Manager for Apache Flink installed using Helm. For installation instructions, see Install Confluent Manager for Apache Flink with Helm.
- A PyFlink project with Python dependencies managed by a package manager such as
pip
. - Docker installed.
Set up the project configuration¶
Follow these steps to set up your PyFlink project configuration.
Create a Python file for your PyFlink application. Following is an example
python_demo.py
:import logging import sys from pyflink.table import TableEnvironment, EnvironmentSettings def python_demo(): t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) t_env.execute_sql( """ CREATE TABLE orders ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'datagen' )""" ) t_env.execute_sql( """ CREATE TABLE print_table WITH ('connector' = 'print') LIKE orders""" ) t_env.execute_sql( """ INSERT INTO print_table SELECT * FROM orders""" ) if __name__ == "__main__": logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") python_demo()
Create a Python dependencies file. You can create a
requirements.txt
file or use a modern Python package manager likeuv
to manage your PyFlink dependencies:cp-pyflink>2.0
Deploy your PyFlink application with CMF¶
After you have developed your PyFlink application locally, you need to package it for deployment. Following are the recommended packaging options.
Package with a custom docker image¶
You should package the PyFlink job with a custom Docker image that contains the Python environment and
dependencies. You must already have the infrastructure in place for building Docker images
in a build pipeline. You base the custom Docker image on the confluentinc/cp-flink:2.0.0-cp1
image found on Docker Hub.
Create the Dockerfile. Following is an example Dockerfile:
FROM debian:latest AS builder # Install dependencies first RUN set -ex; \ apt-get update; \ apt-get -y install gcc default-jdk; \ rm -rf /var/lib/apt/lists/* # Set the correct JAVA_HOME ENV JAVA_HOME=/usr/lib/jvm/default-java # Setup python environment with uv COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/ # Install python in the app folder so we can port it over ENV UV_PYTHON_INSTALL_DIR=/opt/flink/pyflink/.uv # Create the virtual environment RUN mkdir -p /opt/flink/pyflink WORKDIR /opt/flink/pyflink RUN uv venv --python 3.11 .venv # Copy the project and install dependencies. This demo only requires cp-pyflink, # but you can install dependencies the way you prefer. We suggest building a project # that can be setup by running `uv sync`. # Make sure to install the correct version of cp-pyflink COPY python_demo.py ./ RUN uv pip install cp-pyflink>2.0 # Build the final image, copy the python project and virtualenv in /opt/flink/pyflink FROM confluentinc/cp-flink:2.0.0-cp1 COPY --from=builder --chown=flink:flink /opt/flink/pyflink/ /opt/flink/pyflink/
Build the docker image. The following shows an example command to do this:
docker build -t pyflink-test:latest .
Define the Flink application like shown in the following example:
apiVersion: cmf.confluent.io/v1 kind: FlinkApplication metadata: name: python-example spec: image: pyflink-test:latest flinkVersion: v2_0 flinkConfiguration: metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory metrics.reporter.prom.port: 9249-9250 taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: resource: cpu: 1 memory: 1024m taskManager: resource: cpu: 1 memory: 1024m job: jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.1.jar state: running parallelism: 1 upgradeMode: stateless entryClass: org.apache.flink.client.python.PythonDriver args: - -pyclientexec - /opt/flink/pyflink/.venv/bin/python3 - -py - /opt/flink/pyflink/python_demo.py
Submit the application definition¶
To run your PyFlink application with CMF you need to make the Python environment and dependencies available to the Flink clusters. After you package the application, you can use the Confluent CLI to submit your Flink application definition.
For example:
confluent flink application create --environment <env-name> <application-definition>
Important notes¶
Note the following about deploying PyFlink applications with CMF:
- Python Version Compatibility: Ensure your Python version is compatible with the PyFlink version you’re using. See Versions and Interoperability for Confluent Manager for Apache Flink for more information.
- Flink Python JAR: The
jarURI
should point to the Flink Python JAR file that matches your Flink version. The path is typically/opt/flink/opt/flink-python_<version>.jar
. - Entry Class: PyFlink applications use
org.apache.flink.client.python.PythonDriver
as the entry class. - Python Executor: The
-pyclientexec
argument should point to the Python executable in your container or its virtual environment. - Python Script Path: The
-py
argument should point to the path of your Python script within the container. - Resource Requirements: Adjust the CPU and memory requirements based on your application’s needs.
- Dependencies: Make sure all required Python packages are installed in your container.
Troubleshooting¶
Following are some troubleshooting tips if you encounter issues when deploying PyFlink applications with CMF:
- Python Environment Issues: Ensure the Python virtual environment is properly set up and all dependencies are installed.
- JAR File Path: Verify that the Flink Python JAR file exists at the specified path in your container.
- Permissions: Make sure the
flink
user has read access to the Python script and dependencies. - Logs: Check the Flink job manager and task manager logs for Python-related errors.