Configure Storage with Confluent Operator

Storage options

By default, Operator manages storage using dynamic storage provisioning that Kubernetes provides.

If you must rely on statically provisioned storage volumes, you can manually provision and attach storage to your Kubernetes worker nodes, expose those to the platform as PersistentVolumes, and then use Confluent Operator to deploy Confluent Platform clusters so that the broker instances mount those PersistentVolumes.

Depending on how you are specifying storage requirements to Confluent Operator, you have the following options:

  • Create a StorageClass and specify the class name for Confluent Operator to use
  • Use the default Kubernetes StorageClass
  • Specify storage provisioner and other details for Confluent Operator Helm charts to create a StorageClass

Confluent Operator does not support migration from one storage class to another.

Use pre-defined StorageClass

Starting in Confluent Operator 5.5, you can instruct Operator to use a specific StorageClass for all PersistentVolumes it creates.

You can provide a storage class to use for the entire Confluent Platform, or you can specify different storage classes for different components such as ZooKeeper, Kafka, ksqlDB, and Control Center.

  1. Create or use a pre-defined StorageClass you want to use in your Kubernetes cluster.

    You must ensure that volumeBindingMode is set to WaitForFirstConsumer on your storage class for use with Confluent Operator. For more information, see Volume Binding Mode for Kubernetes Storage Class.

    You need to have sufficient permissions to create and modify StorageClasses in your Kubernetes cluster if you intend to create a new StorageClass to use rather than using a pre-existing one.

  2. In the configuration file ($VALUES_FILE), specify the name of the StorageClass to use for deploying Confluent Platform.

    1. To specify a StorageClass for all component deployments, specify the storage class name in global.storageClassName:

      global:
        storageClassName:
      
    2. To specify a StorageClass for a particular component, specify the storage class name in <component>.storageClassName. For example, for Kafka:

      kafka:
        storageClassName:
      

Use the Kubernetes default StorageClass

Starting in Confluent Operator 5.5, you can configure Confluent Operator to use the Kubernetes default storage class.

To use this option, you must ensure that volumeBindingMode is set to WaitForFirstConsumer. For more information, see Volume Binding Mode for Kubernetes Storage Class.

The process for using statically provisioned storage is the same as above. Ensure that the storageClassName specified in your PersistentVolume definitions matches the name of your Kubernetes cluster’s default StorageClass.

To use the Kubernetes default storage class, in the configuration file ($VALUES_FILE):

  • Do not specify the global level storageClassName values or set it to an empty string ("").
  • Do not specify the component level storageClassName value or set it to an empty string ("").
  • Do not specify the global.provider.storage object.

The associated volumes will use the default StorageClass of your Kubernetes cluster. The support for default StorageClasses is enabled by default in versions 1.11 and higher of Kubernetes.

Use the StorageClass created by Confluent Operator Helm charts

To have Confluent Operator Helm charts create a storage class for your Confluent Platform cluster, set the following in the configuration file ($VALUES_FILE):

global:
  provider:
  kubernetes:
    deployment:
      zones:
        - us-central1-a
    storageClassName:""                 ----- [1]
    storage:                            ----- [2]
      provisioner: kubernetes.io/gce-pd
      allowVolumeExpansion:             ----- [3]
      parameters:
        type: pd-ssd
  • [1] Set storageClassName:"", or do not specify storageClassName, either at the global level or at the component level.

  • [2] If you are configuring a multi-zone cluster, Confluent Operator creates a storage class for each zone specified in global.provider.kubernetes.deployment.zone.

    See Storage Class Provisioners for configuration examples. This example uses GCE persistent disk storage (gce-pd) and solid-state drives (pd-ssd) as described in GCE PD.

  • [3] Set allowVolumeExpansion: false to disable automatic storage expansion. Confluent Operator supports automatic expansion of storage, by default, for the storage classes created by Confluent Operator Helm charts.

When creating Confluent Platform clusters, Confluent Operator creates multiple StorageClasses on the fly using the data under global.provider.storage as the spec for each StorageClass, and they will be named according to the following pattern:

{cp-component-helm-chart-name}-standard-ssd-{zone}

If doing this, then the process for using statically provisioned storage is the same as above.

Precedence of the storage options

Precedence rules for the possible storage configuration options are as follows:

  • If storageClassName is specified both at the global level (in the global object) and component levels (in the component objects), the component-level storageClassName is used.
  • If storageClassName is specified at the component level, and the global.provider.storage object is specified, the component-level storageClassName is used.
  • If both the global level storageClassName and global.provider.storage are specified, Operator will return an error.

Tiered Storage

Starting in Confluent Platform 6.0, you can use Operator to enable Tiered Storage with AWS S3 or Google GCS.

Enable Tiered Storage

When you enable Tiered Storage, you need to configure Kafka with the following:

  • The type of blob storage to use. Currently AWS S3 and GCS are supported.
  • The name of the storage bucket to use. You must have created this bucket in advance, Operator does not create this bucket on your behalf.

You also need to ensure that the Kafka brokers have appropriate access to the storage bucket. You can use one of the following options:

  • Use a Service Account

    You can map cloud IAM permissions to the Kubernetes ServiceAccount associated with your Kafka broker pods.

    AWS provides the ability to natively associate AWS IAM permissions with ServiceAccounts in EKS.

    Similarly, GCP provides the ability to map IAM permissions with ServiceAccounts in GKE.

    You can map the appropriate bucket permissions to the default ServiceAccount in the Kubernetes namespace where you plan to deploy Kafka, or you can map them to a separate ServiceAccount and use Operator to ensure the Kafka broker pods are associated with that ServiceAccount. The primary benefit of this approach is that you do not need to actually manage sensitive credentials for bucket access when deploying Confluent Platform via Operator.

    For more on associating AWS IAM roles for service accounts on EKS, see IAM roles for service accounts.

    For more on associating GCP IAM roles for service accounts on GKE, see Workload Identity.

    For more information on configuring which Kubernetes Service Account to associate with Confluent Platform components managed by Operator, see Provide Service Account.

  • Use a Secret object

    You can put your AWS or GCP credentials in a Secret object and configure Kafka to use the credentials in that object, when deploying Kafka via the Operator.

In addition to the above required settings, you can configure other Tiered Storage settings using configOverrides in the kafka section. For the available settings, see Tiered Storage.

When a Kafka cluster is deleted, Operator does not perform a garbage collection of the Tiered Storage bucket contents. You can either wait for the set interval or manually delete the objects in the Tiered Storage bucket. For more information, see Time Interval for Topic Deletes.

Configure Tiered Storage for AWS S3

To enable and configure Tiered Storage with AWS S3, set the following config overrides for Kafka in the config file ($VALUES_FILE).

kafka:
  configOverrides:
    server:
      - confluent.tier.feature=true                ----- [1]
      - confluent.tier.enable=true                 ----- [2]
      - inter.broker.protocol.version=             ----- [3]

      - confluent.tier.backend=S3                  ----- [4]
      - confluent.tier.s3.bucket=                  ----- [5]
      - confluent.tier.s3.region=                  ----- [6]
      - confluent.tier.s3.cred.file.path=          ----- [7]
      - confluent.tier.topic.delete.check.interval.ms -- [8]

  mountedSecrets:                                  ----- [9]
    - secretRef:
      keyItems:
        - key:
          path:
  • [1] Set confluent.tier.feature=true to enable Tiered Storage.

  • [2] Set confluent.tier.enable to the default value for created topics. Setting this to true causes all non-compacted topics to be tiered.

  • [3] Set inter.broker.protocol.version to 2.4 or higher. 2.4 is the minimum Kafka version that supports Tiered Storage.

  • [4] Set confluent.tier.backend to S3.

  • [5] Set confluent.tier.s3.bucket to the S3 bucket you want to use.

  • [6] Set confluent.tier.s3.region to the region.

  • [7] Optional. Specify confluent.tier.s3.cred.file.path if using Secrets to provide credentials for Tiered Storage. If using Service Accounts, this property is not necessary.

    To see what to add in the Secrets file, refer to Tiered Storage.

  • [8] Optional. Set confluent.tier.topic.delete.check.interval.ms to a time interval for which the deletion of log segment files takes place after a topic or a cluster is deleted. The default value for this time interval is 3 hours.

  • [9] Optional. Only required if using Secrets to provide credentials for Tiered Storage. See Provide Mounted Secrets for the available options for configuring mounted Secrets.

For example:

kafka:
  configOverrides:
    server:
      - confluent.tier.feature=true
      - confluent.tier.enable=true
      - inter.broker.protocol.version=2.6

      - confluent.tier.backend=S3
      - confluent.tier.s3.bucket=my-bucket
      - confluent.tier.s3.region=us-west-2
      - confluent.tier.s3.cred.file.path=/mnt/secrets/my-secret-aws/aws/creds

  mountedSecrets:
    - secretRef: my-secret-aws
      keyItems:
        - key: credentials
          path: aws/creds

Configure Tiered Storage for GCS

To enable and configure Tiered Storage with GCS, set the following config overrides for Kafka in the config file ($VALUES_FILE).

kafka:
  configOverrides:
    server:
      - confluent.tier.feature=true                ----- [1]
      - confluent.tier.enable=true                 ----- [2]
      - inter.broker.protocol.version=             ----- [3]

      - confluent.tier.backend=GCS                 ----- [4]
      - confluent.tier.gcs.bucket=                 ----- [5]
      - confluent.tier.gcs.region=                 ----- [6]
      - confluent.tier.gcs.cred.file.path=         ----- [7]
      - confluent.tier.topic.delete.check.interval.ms -- [8]

  mountedSecrets:                                  ----- [9]
    - secretRef:
      keyItems:
        - key:
          path:
  • [1] Set confluent.tier.feature=true to enable Tiered Storage.

  • [2] Set confluent.tier.enable to the default value for created topics. Setting this to true causes all non-compacted topics to be tiered.

  • [3] Set inter.broker.protocol.version to 2.4 or higher. 2.4 is the minimum Kafka version that supports Tiered Storage.

  • [4] Set confluent.tier.backend to GCS.

  • [5] Set confluent.tier.gcs.bucket to the GCS bucket you want to use.

  • [6] Set confluent.tier.gcs.region to the GCS region.

  • [7] Optional. Specify confluent.tier.gcs.cred.file.path if using Secrets for Tiered Storage. If using Service Accounts, this property is not necessary.

    To see what to add in the Secrets file, refer to Tiered Storage.

  • [8] Optional. Set confluent.tier.topic.delete.check.interval.ms to a time interval for which the deletion of log segment files takes place after a topic or a cluster is deleted. The default value for this time interval is 3 hours.

  • [9] Optional. Only required if using Secrets to provide credentials for Tiered Storage. See Provide Mounted Secrets for the available options for configuring mounted Secrets.

For example:

kafka:
  configOverrides:
    server:
      - confluent.tier.feature=true
      - confluent.tier.enable=true
      - inter.broker.protocol.version=2.6

      - confluent.tier.backend=GCS
      - confluent.tier.gcs.bucket=my-bucket
      - confluent.tier.gcs.region=us-central1
      - confluent.tier.gcs.cred.file.path=/mnt/secrets/my-secret-gcs/credentials

  mountedSecrets:
  - secretRef: my-secret-gcs