Configure Storage for Confluent Platform

Overview

Confluent for Kubernetes (CFK) supports the use of Kubernetes Storage Classes to provision persistent storage volumes for most of the Confluent Platform components.

Connect and Schema Registry do not use persistent storage volumes, and thus do not need a Storage Class to be specified.

Kafka

Confluent for Kubernetes can provision Kafka with the following storage options:

  • Persistent storage volumes

    Kafka requires block level storage solutions, such as:

    • AWS EBS
    • Azure Disk
    • GCE Disk
    • Ceph RBD
    • Portworx
  • Tiered Storage

    Confluent supports various object storage solution for Tiered Storage, such as:

    • AWS S3
    • GCP GCS
    • Pure Storage
    • FlashBlade

ZooKeeper

ZooKeeper uses the same persistent storage volume solution that Kafka uses.

Other Confluent components

Each Confluent component requires one persistent storage volume to be configured as the component uses it for maintaining working state during the lifetime of the service.

Additionally, Confluent components utilize Kafka for their durable shared storage needs. For example, Schema Registry stores schemas in Kafka, and Confluent Control Center stores operational state and metrics in Kafka.

Persistent storage volumes

Storage class configuration is one of the most critical steps in the Confluent for Kubernetes (CFK) configuration process.

When configuring Confluent components to use persistent storage volumes, the following options are supported:

  • Dynamic Provisioning: Use a pre-defined Kubernetes Storage Class
  • Dynamic Provisioning: Use the Kubernetes default Storage Class
  • Custom Provisioning: Use pre-provisioned persistent storage volumes

By default, CFK manages storage using dynamic storage provisioning that Kubernetes provides.

Confluent for Kubernetes does not support migration from one storage class to another.

Use pre-defined Kubernetes StorageClass for dynamic provisioning

You can provide a storage class to use for the entire Confluent Platform, or you can specify different storage classes for different components such as ZooKeeper, Kafka, ksqlDB, and Control Center.

To use a pre-defined storage class:

  1. Create or use a pre-defined StorageClass you want to use in your Kubernetes cluster.

    The following settings are the best practice recommendations for your storage class:

    • volumeBindingMode: WaitForFirstConsumer

    • reclaimPolicy: Retain

      Especially for production deployments, this setting is required.

    • allowVolumeExpansion: true

    You need to have sufficient permissions to create and modify StorageClasses in your Kubernetes cluster if you intend to create a new StorageClass to use rather than using a pre-existing one.

    For more information and example, see Kubernetes Storage Classes.

  2. In your Confluent component CR, specify the name of the StorageClass to use:

    spec:
      storageClass:
        name: my-storage-class
    

Use default Kubernetes StorageClass for dynamic provisioning

The support for default StorageClasses is enabled by default in versions 1.11 and higher of Kubernetes. If you do not provide the spec.storageClass in the CR, CFK will use the default storage class.

Use the following command to get the name of the current default storage class:

kubectl get sc

To use the Kubernetes default storage class, make sure the following properties are set on the default StorageClasses:

  • volumeBindingMode: WaitForFirstConsumer

  • reclaimPolicy: Retain

    Especially for production deployments, this setting is required.

  • allowVolumeExpansion: true

Important

We do not recommend using the default StorageClass in production environments.

Use statically provisioned persistent volumes

By default, CFK automates disk management by leveraging Kubernetes dynamic storage provisioning. If your Kubernetes cluster does not support dynamic provisioning, you can follow the instructions in this section to use statically-provisioned disks for your Kafka deployments.

  1. Create a StorageClass in Kubernetes for local provisioning. For example:

    apiVersion: storage.k8s.io/v1
    kind: StorageClass
    metadata:
      name: my-storage-class
    provisioner: kubernetes.io/no-provisioner
    volumeBindingMode: WaitForFirstConsumer
    
  2. Create PersistentVolumes with the desired host path and the hostname label for each of the desired worker nodes. For example:

    apiVersion: v1
    kind: PersistentVolume
    metadata:
      name: pv-1                                                --- [1]
    spec:
      capacity:
        storage: 10Gi                                           --- [2]
      volumeMode: Filesystem
      accessModes:
      - ReadWriteOnce
      persistentVolumeReclaimPolicy: Retain                     --- [3]
      storageClassName: my-storage-class                        --- [4]
      local:
         path: /mnt/data/broker-1-data                          --- [5]
      nodeAffinity:
        required:
          nodeSelectorTerms:
          - matchExpressions:
            - key: kubernetes.io/hostname
              operator: In
              values:
              - gke-myhost-cluster-default-pool-5cc13882-k0gb  --- [6]
    
    • [1] Choose a name for the PersistentVolume.

    • [2] Choose a storage size that is greater than or equal to the storage you’re requesting for each Kafka broker instance. This corresponds to the spec.dataVolumeCapacity property of the Kafka CR.

    • [3] Choose Retain if you want the data to be retained after you delete the PersitentVolumeClaim that CFK will eventually create and which Kubernetes will eventually bind to this PersistentVolume.

      Choose Delete if you want this data to be garbage-collected when the PersistentVolumeClaim is deleted.

      Important

      With persistentVolumeReclaimPolicy: Delete, your data on the volume will be deleted when you delete the pod and consequently its persistent volume.

    • [4] The storageClassName must match the one created in Step 1.

    • [5] This is the directory path you want to use on the worker node for the broker as its persistent data volume. The path must exist on the worker node.

    • [6] This is the value of the kubernetes.io/hostname label of the worker node you want to host this broker instance. To find this hostname, run the following command:

      kubectl get nodes \
        -o 'custom-columns=NAME:metadata.name,HOSTNAME:metadata.labels.kubernetes\.io/hostname'
      
      NAME                                           HOSTNAME
      gke-myhost-cluster-default-pool-5cc13882-k0gb   gke-myhost-cluster-default-pool-5cc13882-k0gb
      gke-myhost-cluster-default-pool-5cc13882-n8vr   gke-myhost-cluster-default-pool-5cc13882-n8vr
      gke-myhost-cluster-default-pool-5cc13882-tbbj   gke-myhost-cluster-default-pool-5cc13882-tbbj
      
  3. Repeat the above step for each Kafka broker node.

  4. Add the storageClass to the Kafka CR, for example:

    spec:
      storageClass:
        name: my-storage-class
    
  5. After deploying the new CR, validate that the PersistentVolumes are bound:

    kubectl get pv
    
    NAME  CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM                   STORAGECLASS
    pv-1  10Gi     RWO          Retain         Bound  operator/data0-kafka-0  my-storage-class
    pv-2  10Gi     RWO          Retain         Bound  operator/data0-kafka-2  my-storage-class
    pv-3  10Gi     RWO          Retain         Bound  operator/data0-kafka-1  my-storage-class
    
  6. Validate that the Kafka pods are healthy:

    kubectl get pods -l app=kafka
    
    NAME      READY   STATUS    RESTARTS   AGE
    kafka-0   1/1     Running   0          40m
    kafka-1   1/1     Running   0          40m
    kafka-2   1/1     Running   0          40m
    

Tiered Storage

You can use Confluent for Kubernetes to enable Tiered Storage. Confluent supports various object Storage solutions, such as:

  • AWS S3
  • GCP GCS
  • Pure Storage
  • FlashBlade

Enable Tiered Storage

When you enable Tiered Storage, you need to configure Kafka with the following:

  • The type of blob storage to use.

  • The name of the storage bucket to use.

    You must have created this bucket in advance, CFK does not create this bucket on your behalf.

You also need to ensure that the Kafka brokers have appropriate access to the storage bucket. You can use one of the following options:

Use Service Account to give Kafka brokers access to the storage bucket

Map cloud IAM permissions to the Kubernetes ServiceAccount associated with your Kafka broker pods.

AWS provides the ability to natively associate AWS IAM permissions with ServiceAccounts in EKS.

Similarly, GCP provides the ability to map IAM permissions with ServiceAccounts in GKE.

You can map the appropriate bucket permissions to the default ServiceAccount in the Kubernetes namespace where you plan to deploy Kafka, or you can map them to a separate ServiceAccount and use CFK to ensure the Kafka broker pods are associated with that ServiceAccount. The primary benefit of this approach is that you do not need to actually manage sensitive credentials for bucket access when deploying Confluent Platform via CFK.

For more on associating AWS IAM roles for service accounts on EKS, see IAM roles for service accounts.

For more on associating GCP IAM roles for service accounts on GKE, see Workload Identity.

For more information on configuring which Kubernetes Service Account to associate with Confluent Platform components managed by CFK, see Configure Kafka Connect & ksqlDB using Confluent Cloud.

Use the Kubernetes Secret object to give Kafka brokers access to the storage bucket

Put your AWS or GCP credentials in a Secret object and configure Kafka to use the credentials in that object, when deploying Kafka via the CFK.

When your storage credentials change, you need to restart the Kafka cluster.

In addition to the above required settings, you can configure other Tiered Storage settings using configOverrides in the kafka section. For the available settings, see Tiered Storage.

When a Kafka cluster is deleted, CFK does not perform a garbage collection of the Tiered Storage bucket contents. You can either wait for the set interval or manually delete the objects in the Tiered Storage bucket. For more information, see Time Interval for Topic Deletes.

Configure Tiered Storage for AWS S3

To enable and configure Tiered Storage with AWS S3, set the following config overrides in Kafka CR:

spec:
  configOverrides:
    server:
      - confluent.tier.feature=true                ----- [1]
      - confluent.tier.enable=true                 ----- [2]
      - confluent.tier.backend=S3                  ----- [3]
      - confluent.tier.s3.bucket=                  ----- [4]
      - confluent.tier.s3.region=                  ----- [5]
      - confluent.tier.s3.cred.file.path=          ----- [6]
      - confluent.tier.topic.delete.check.interval.ms -- [7]

  mountedSecrets:                                  ----- [8]
    - secretRef:
  • [1] Set confluent.tier.feature=true to enable Tiered Storage.

  • [2] Set confluent.tier.enable to the default value for created topics. Setting this to true causes all non-compacted topics to be tiered.

  • [3] Set confluent.tier.backend to S3.

  • [4] Set confluent.tier.s3.bucket to the S3 bucket you want to use.

  • [5] Set confluent.tier.s3.region to the region.

  • [6] Optional. Specify confluent.tier.s3.cred.file.path if using Secrets to provide credentials for Tiered Storage. If using Service Accounts, this property is not necessary.

    To see what to add in the Secrets file, refer to Tiered Storage.

  • [7] Optional. Set confluent.tier.topic.delete.check.interval.ms to a time interval for which the deletion of log segment files takes place after a topic or a cluster is deleted. The default value for this time interval is 3 hours.

  • [8] Optional. Only required if using Secrets to provide credentials for Tiered Storage.

For example:

spec:
  configOverrides:
    server:
      - confluent.tier.feature=true
      - confluent.tier.enable=true
      - confluent.tier.backend=S3
      - confluent.tier.s3.bucket=my-bucket
      - confluent.tier.s3.region=us-west-2
      - confluent.tier.s3.cred.file.path=/mnt/secrets/my-secret-aws/aws/creds

  mountedSecrets:
    - secretRef: my-secret-aws

Configure Tiered Storage for GCS

To enable and configure Tiered Storage with GCS, set the following config overrides in Kafka CR:

spec:
  configOverrides:
    server:
      - confluent.tier.feature=true                ----- [1]
      - confluent.tier.enable=true                 ----- [2]
      - confluent.tier.backend=GCS                 ----- [3]
      - confluent.tier.gcs.bucket=                 ----- [4]
      - confluent.tier.gcs.region=                 ----- [5]
      - confluent.tier.gcs.cred.file.path=         ----- [6]
      - confluent.tier.topic.delete.check.interval.ms -- [7]

  mountedSecrets:                                  ----- [8]
    - secretRef:
  • [1] Set confluent.tier.feature=true to enable Tiered Storage.

  • [2] Set confluent.tier.enable to the default value for created topics. Setting this to true causes all non-compacted topics to be tiered.

  • [3] Set confluent.tier.backend to GCS.

  • [4] Set confluent.tier.gcs.bucket to the GCS bucket you want to use.

  • [5] Set confluent.tier.gcs.region to the GCS region.

  • [6] Optional. Specify confluent.tier.gcs.cred.file.path if using Secrets for Tiered Storage. If using Service Accounts, this property is not necessary.

    To see what to add in the Secrets file, refer to /platform/current/.

  • [7] Optional. Set confluent.tier.topic.delete.check.interval.ms to a time interval for which the deletion of log segment files takes place after a topic or a cluster is deleted. The default value for this time interval is 3 hours.

  • [8] Optional. Only required if using Secrets to provide credentials for Tiered Storage.

For example:

specify:
  configOverrides:
    server:
      - confluent.tier.feature=true
      - confluent.tier.enable=true
      - confluent.tier.backend=GCS
      - confluent.tier.gcs.bucket=my-bucket
      - confluent.tier.gcs.region=us-central1
      - confluent.tier.gcs.cred.file.path=/mnt/secrets/my-secret-gcs/credentials

  mountedSecrets:
  - secretRef: my-secret-gcs