Configure Environments in Confluent Manager for Apache Flink

An Flink environment in Confluent Manager for Apache Flink® (CMF) groups a set of Flink applications together.

Flink environments fulfill two main roles:

  • Isolation: Access control for individual team members is managed at the environment-level (logical isolation). Flink applications that belong to an environment are deployed to the same Kubernetes namespace (physical isolation).

  • Shared configuration: Flink configuration at the environment level has precedence over Flink configuration for individual Flink applications. This can be used to enforce configuration options and avoid redundant configuration. A common use case is setting a common observability configuration, or checkpoint storage destination for all Flink clusters in an environment.

For example, you might configure dev and prod environments or regional environments for your organization.

Flink configuration options, such as for the storage location of Flink checkpoints, are expected to differ between different environments, such as dev and prod. Instead of requiring individual Flink applications to configure these options and keep track of which environment they are running in, setting these options at the environment-level makes management easier and separates concerns between platform operators and developers nicely.

Tools for managing Flink environments

You can use these tools to create and manage environments:

Authorization

For role-based access control (RBAC), the Flink environment is used as the scope to control access. You can grant individual users access to specific environments to read/manage Flink applications.

Create an environment

You can create an environment using the REST API or the Confluent CLI. First you create a JSON file that defines the environment and then you use the REST API or the Confluent CLI to create the environment.

Following is an example of a environment payload that can be used to create an environment using the Confluent REST API:

{
    "name": "test-dev",
    "kubernetesNamespace": "dev-shared",
    "flinkApplicationDefaults": {
        "spec": {
            "flinkConfiguration": {
                "taskmanager.numberOfTaskSlots": "2"
            },
            "jobManager": {
                "resource": {
                    "memory": "1024m",
                    "cpu": 0.5
                }
            },
            "taskManager": {
                "resource": {
                    "memory": "2048m",
                    "cpu": "1.5"
                }
            }
        }
    },
    "computePoolDefaults": {
        "clusterSpec": {
            "image": "confluentinc/cp-flink-sql:1.19.1-cp1",
            "flinkVersion": "v1_19",
            "jobManager": {
                "resource": {
                    "memory": "1024m",
                    "cpu": "1500m"
                }
            },
            "taskManager": {
                "resource": {
                    "memory": "2048m",
                    "cpu": "400m"
                }
            }
        }
    },
    "statementDefaults": {
        "detached": {
            "flinkConfiguration": {
                "state.backend.type": "rocksdb"
            }
        }
    }
}

Save the JSON definition above to a file, for example, env-dev.json and use the following curl command to create an environment using the REST API:

curl -v -H "Content-Type: application/json" -X POST http://cmf:8080/cmf/api/v1/environments -d @/path/to/env-dev.json

When you create an environment with the Confluent CLI, Flink application defaults, compute pool defaults, and statement defaults in different files, and pass those files to the Confluent CLI. Following are examples of the required JSON files:

env-dev-flink-application-defaults.json

{
    "spec": {
        "flinkConfiguration": {
            "taskmanager.numberOfTaskSlots": "2"
        },
        "jobManager": {
            "resource": {
                "memory": "1024m",
                "cpu": 0.5
            }
        },
        "taskManager": {
            "resource": {
                "memory": "2048m",
                "cpu": "1.5"
            }
        }
    }
}

env-dev-compute-pool-defaults.json

{
    "clusterSpec": {
        "image": "confluentinc/cp-flink-sql:1.19.1-cp1",
        "flinkVersion": "v1_19",
        "jobManager": {
            "resource": {
                "memory": "1024m",
                "cpu": "1500m"
            }
        },
        "taskManager": {
            "resource": {
                "memory": "2048m",
                "cpu": "400m"
            }
        }
    }
}

env-dev-statement-defaults.json

{
    "detached": {
        "flinkConfiguration": {
            "state.backend.type": "rocksdb"
        }
    }
}

Save the JSON definitions to files, for example, env-dev-flink-application-defaults.json, env-dev-compute-pool-defaults.json, and env-dev-statement-defaults.json respectively.

The following command creates an environment using the Confluent CLI.

confluent flink environment create test-dev --defaults /path/to/env-dev-flink-application-defaults.json --statement-defaults /path/to/env-dev-statement-defaults.json --compute-pool-defaults /path/to/env-dev-compute-pool-defaults.json --kubernetes-namespace dev-shared

Update an environment

You can update an existing environment to change its configuration, for example, to modify the default checkpointing interval, you create a file with the change and pass this file to the Confluent REST API or the Confluent CLI to update the environment.

To change the checkpointing interval to 90s, you would modify the env-dev.json file. The following is an example of the modified file:

{
    "name": "test-dev",
    "kubernetesNamespace": "dev-shared",
    "flinkApplicationDefaults": {
        "spec": {
            "flinkConfiguration": {
                "taskmanager.numberOfTaskSlots": "2",
                "execution.checkpointing.interval": "90s"
            },
            "jobManager": {
                "resource": {
                    "memory": "1024m",
                    "cpu": 0.5
                }
            },
            "taskManager": {
                "resource": {
                    "memory": "2048m",
                    "cpu": "1.5"
                }
            }
        }
    },
    "computePoolDefaults": {
        "clusterSpec": {
            "image": "confluentinc/cp-flink-sql:1.19.1-cp1",
            "flinkVersion": "v1_19",
            "jobManager": {
                "resource": {
                    "memory": "1024m",
                    "cpu": "1500m"
                }
            },
            "taskManager": {
                "resource": {
                    "memory": "2048m",
                    "cpu": "400m"
                }
            }
        }
    },
    "statementDefaults": {
        "detached": {
            "flinkConfiguration": {
                "state.backend.type": "rocksdb"
            }
        }
    }
}

The following curl command updates the test-dev environment using the REST API and the env-dev.json file. Note the inclusion of the environment’s name in the URL.

curl -v -H "Content-Type: application/json" -X POST http://cmf:8080/cmf/api/v1/environments/test-dev -d @/path/to/env-dev.json

To change the checkpointing interval to 90s, you would modify the application defaults in the env-dev-flink-application-defaults.json file:

env-dev-flink-application-defaults.json

{
 "spec": {
     "flinkConfiguration": {
         "taskmanager.numberOfTaskSlots": "2",
         "execution.checkpointing.interval": "90s"
     },
     "jobManager": {
         "resource": {
             "memory": "1024m",
             "cpu": 0.5
         }
     },
     "taskManager": {
         "resource": {
             "memory": "2048m",
             "cpu": "1.5"
         }
     }
 }

The following command updates the test-dev environment using the Confluent CLI. Note the inclusion of other default files as well. If you do not include them, those defaults will be removed from the environment.

confluent flink environment update test-dev --defaults @/path/to/env-dev-flink-application-defaults.json --statement-defaults @/path/to/env-dev-statement-defaults.json --compute-pool-defaults @/path/to/env-dev-compute-pool-defaults.json

Delete an environment

An environment can only be deleted if it contains no Flink applications and statements. You must first delete all Flink applications and statements contained in an environment before you can delete the environment.

The following curl command deletes the test-dev environment using the REST API:

curl -v -X DELETE http://cmf:8080/cmf/api/v1/environments/test-dev

The following command deletes the test-dev environment using the Confluent CLI:

confluent flink environment delete test-dev