階層型ストレージ

Confluent Platform 6.0.0 からは、それまでのリリースでのプレビューを経て、階層型ストレージが完全にサポートされています。階層型ストレージにより、運用の負荷とコストが削減されるため、Kafka に格納した大規模ボリュームデータの管理が可能になります。基本的な考え方は、データストレージに対する懸念をデータ処理の問題から切り離すことです。これにより、データストレージとデータ処理を独立してスケーリングできるようになります。階層型ストレージを使用することで、ウォームデータをコスト効率の高いオブジェクトストレージに送信できます。その結果、ブローカーをスケーリングするのは、コンピュートリソースがさらに必要な場合のみになります。

重要

  • Confluent Platform 6.0.0 の時点では、階層型ストレージは一度有効にすると無効にできません。
  • 階層型ストレージが有効になっているクラスター内ではすべてのブローカーで同じバケットを使用する必要があります。これはサポートされているすべてのプラットフォームに適用されます。

ブローカーでの階層型ストレージの有効化

Confluent Platform 6.0.0 以降を実行しているクラスターで階層型ストレージを有効にするには、クラウドプロバイダーの構成を以下のように指定します。

これらの構成をアップデートしたら、階層型ストレージ用に有効にされたブローカーを再起動します。この再起動は ローリング 形式で行うことができます。

ちなみに

この手順を Confluent Control Center から開始することもできます(「階層型ストレージの構成」を参照)。

AWS

階層型ストレージでは、読み取りと書き込みができる Amazon S3 のバケットと、バケットにアクセスするための認証情報も必要になります。

階層型ストレージをアマゾンウェブサービス(AWS)の Amazon Simple Storage Service(S3 バケット)で有効にするには、次の手順を実行します。

  1. 次のプロパティを server.properties ファイルに追加します。

    confluent.tier.feature=true
    confluent.tier.enable=true
    confluent.tier.backend=S3
    confluent.tier.s3.bucket=<BUCKET_NAME>
    confluent.tier.s3.region=<REGION>
    # confluent.tier.metadata.replication.factor=1
    

    ちなみに

    階層型ストレージの内部トピックは、デフォルトで、レプリケーション係数が 3 になります。「Confluent Platform のクイックスタート(ローカルインストール)」で説明されているような単一のブローカークラスターを実行するために confluent local services start を使用する場合は、上のコードの最終行をコメント解除することにより、階層型ストレージトピックのデフォルトのレプリケーション係数を上書きします。

    上記のプロパティを追加すると、AWS で階層型ストレージの各コンポーネントが有効になります。このとき、可能なすべての構成でデフォルトのパラメーターが使用されます。

    • confluent.tier.feature では、ブローカーに対して階層型ストレージを有効にします。これを true にすると、ブローカーで階層型ストレージを活用できます。
    • confluent.tier.enable では、作成されたトピックのデフォルト値を設定します。true に設定すると、未圧縮のトピックがすべて階層型になります。圧縮トピックについては、「既知の制限」を参照してください。
    • confluent.tier.backend は、ブローカーの接続先であるクラウドストレージサービスを示しています。Amazon S3 では、前に示したように S3 に設定します。
    • BUCKET_NAMEREGION はそれぞれ S3 バケット名とそのリージョンです。ブローカーは階層型データの読み取りと書き込みのために、このバケットとやりとりします。

    たとえば、バケット名 tiered-storage-test-awsus-west-2 リージョンにあると、次のプロパティになります。

    confluent.tier.s3.bucket=tiered-storage-test-aws
    confluent.tier.s3.region=us-west-2
    
  2. このブローカーでは、S3 バケットに接続するために、AWS 認証情報 が必要です。これらの情報は server.properties または環境変数を通じて設定できます。どちらの方法を使用しても問題はありません。ブローカーは server.properties に指定された認証情報を優先的に使用します。ブローカーが server.properties に認証情報を見つけられないと、環境変数を使用します。

    • サーバープロパティ - 次のプロパティを server.properties ファイルに追加します。

      confluent.tier.s3.cred.file.path=<PATH_TO_AWS_CREDENTIALS_FILE>
      

      <PATH>AWS の認証情報を含むファイル のファイルパスで置き換えます。

      このフィールドは、サーバーログファイルに記載されません。

    • 環境変数 - AWS の認証情報を次の環境変数に指定します。

      export AWS_ACCESS_KEY_ID=<YOUR_ACCESS_KEY_ID>
      export AWS_SECRET_ACCESS_KEY=<YOUR_SECRET_ACCESS_KEY>
      

      server.properties に認証情報の 2 つのプロパティが含まれない場合、ブローカーは前に示した環境変数を使用して、S3 バケットに接続します。

  3. S3 バケットで、ブローカーに対して、次のアクションの実行を許可する必要があります。これらの操作は、ブローカーで階層型ストレージを有効にして、適切に使用するために必要です。

    s3:DeleteObject
    s3:GetObject
    s3:PutObject
    s3:GetBucketLocation
    

GCS

階層型ストレージを Google Cloud Platform(GCP)の Google Cloud Storage(GCS)で有効にするには、次の手順を実行します。

  1. 階層型ストレージを有効にするには、次のプロパティを server.properties ファイルに追加します。

    confluent.tier.feature=true
    confluent.tier.enable=true
    confluent.tier.backend=GCS
    confluent.tier.gcs.bucket=<BUCKET_NAME>
    confluent.tier.gcs.region=<REGION>
    # confluent.tier.metadata.replication.factor=1
    

    ちなみに

    階層型ストレージの内部トピックは、デフォルトで、レプリケーション係数が 3 になります。「Confluent Platform のクイックスタート(ローカルインストール)」で説明されているような単一のブローカークラスターを実行するために confluent local services start を使用する場合は、上のコードの最終行をコメント解除することにより、階層型ストレージトピックのデフォルトのレプリケーション係数を上書きします。

    上記のプロパティを追加すると、GCS で階層型ストレージの各コンポーネントが有効になります。可能なすべての構成でデフォルトパラメーターが使用されます。

    • confluent.tier.feature では、ブローカーに対して階層型ストレージを有効にします。これを true にすると、ブローカーで階層型ストレージを活用できます。
    • confluent.tier.enable では、作成されたトピックのデフォルト値を設定します。true に設定すると、未圧縮のトピックがすべて階層型になります。圧縮トピックについては、「既知の制限」を参照してください。
    • confluent.tier.backend は、ブローカーの接続先であるクラウドストレージサービスを示します。Google Cloud Storage では、前に示したとおり、これを GCS に設定します。
    • BUCKET_NAMEREGION はそれぞれ S3 バケット名とそのリージョンです。ブローカーは階層型データの読み取りと書き込みのために、このバケットとやりとりします。

    たとえば、バケット名 tiered-storage-test-gcsus-central1 リージョンにあると、次のプロパティになります。

    confluent.tier.gcs.bucket=tiered-storage-test-gcs
    confluent.tier.gcs.region=us-central1
    
  2. このブローカーでは、GCS バケットに接続するために、GCS 認証情報 が必要です。これらの情報は server.properties または環境変数を通じて設定できます。どちらの方法を使用しても問題はありません。ブローカーは server.properties に指定された認証情報を優先的に使用します。ブローカーが server.properties に認証情報を見つけられないと、環境変数を使用します。

    • サーバープロパティ - 次のプロパティを server.properties ファイルに追加します。

      confluent.tier.gcs.cred.file.path=<PATH_TO_GCS_CREDENTIALS_FILE>
      

      このフィールドは、サーバーログファイルに記載されません。

    • 環境変数 - GCS の認証情報を次のローカル環境変数に指定します。

      export GOOGLE_APPLICATION_CREDENTIALS=<PATH_TO_GCS_CREDENTIALS_FILE>
      

      server.properties に認証情報ファイルへのパスがあるプロパティが含まれない場合、ブローカーは前に示した環境変数を使用して、GCS バケットに接続します。

    詳細については、GCS のドキュメント を参照してください。

  3. GCS バケットで、ブローカーに対して、次のアクションの実行を許可する必要があります。これらの操作は、ブローカーで階層型ストレージを有効にして、適切に使用するために必要です。

    storage.buckets.get
    storage.objects.get
    storage.objects.list
    storage.objects.create
    storage.objects.delete
    storage.objects.update
    
証明書のトラブルシューティング

バケットへのアクセス不可やセキュリティ証明書の問題といった階層型ストレージのエラーのためにブローカーを起動できない場合は、必要な Google CA 証明書を持っていることを確認します。トラブルシューティングを行うには、以下の手順を実行します。

  1. Google Trust Services Repository に移動し、Download CA certificates セクションまでスクロールして、Expand all をクリックします。

  2. お使いのクラスターに適した、現在有効な(期限が切れていない)証明書(GlobalSign R4 など)を選択します。その横の Action ドロップダウンをクリックして、クラスターのすべてのブローカーに証明書(PEM)ファイルをダウンロードします。

  3. 次のコマンドを実行して証明書をインポートします。

    keytool -import -trustcacerts -keystore /var/ssl/private/kafka_broker.truststore.jks -alias root -file <certificate.pem file>
    

Pure Storage FlashBlade

階層型ストレージを Amazon S3 API を通じて Pure Storage FlashBlade で有効にするには、次の手順を実行します。

  1. 次のプロパティを server.properties ファイルに追加します。

    confluent.tier.feature=true
    confluent.tier.enable=true
    confluent.tier.backend=S3
    confluent.tier.s3.bucket=<BUCKET_NAME>
    confluent.tier.s3.region=<REGION>
    confluent.tier.s3.aws.endpoint.override=<FLASHBLADE ENDPOINT>
    # confluent.tier.metadata.replication.factor=1
    

    ちなみに

    階層型ストレージの内部トピックは、デフォルトで、レプリケーション係数が 3 になります。「Confluent Platform のクイックスタート(ローカルインストール)」で説明されているような単一のブローカークラスターを実行するために confluent local services start を使用する場合は、上のコードの最終行をコメント解除することにより、階層型ストレージトピックのデフォルトのレプリケーション係数を上書きします。

    上記のプロパティを追加すると、Pure Storage FlashBlade で階層型ストレージの各コンポーネントが有効になります。可能なすべての構成でデフォルトパラメーターが使用されます。

    • confluent.tier.feature では、ブローカーに対して階層型ストレージを有効にします。これを true にすると、ブローカーで階層型ストレージを活用できます。

    • confluent.tier.enable では、作成されたトピックのデフォルト値を設定します。true に設定すると、未圧縮のトピックがすべて階層型になります。圧縮トピックについては、「既知の制限」を参照してください。

    • confluent.tier.backend は、ブローカーの接続先であるクラウドストレージサービスを示しています。Pure Storage FlashBlade では、前に示したとおり、これを S3 に設定します。

    • BUCKET_NAMEREGION はそれぞれ S3 バケット名とそのリージョンです。ブローカーは階層型データの読み取りと書き込みのために、このバケットとやりとりします。リージョンを有効な AWS リージョンに設定します。これらはすべて、Flashblade 内でも同等になります。リージョンを null または空にすることはできません。たとえば、バケット名 tiered-storage-test-awsus-west-2 リージョンにあると、次のプロパティになります。

      confluent.tier.s3.bucket=tiered-storage-test-aws
      confluent.tier.s3.region=us-west-2
      
    • ENDPOINT OVERRIDE は、Pure Storage FlashBlade の接続ポイントを示します。

  2. ブローカーでは、FlashBlade S3 バケットに接続するために、Pure Storage CLI で生成された認証情報が必要です。これらの情報は server.properties または環境変数を通じて設定できます。どちらの方法を使用しても問題はありません。ブローカーは server.properties に指定された認証情報を優先的に使用します。ブローカーが server.properties に認証情報を見つけられないと、環境変数を使用します。

    • サーバープロパティ - 次のプロパティを server.properties ファイルに追加します。

      confluent.tier.s3.cred.file.path=<PATH_TO_AWS_CREDENTIALS_FILE>
      

      <PATH_TO_AWS_CREDENTIALS_FILE>AWS の認証情報を含むファイル のファイルパスで置き換えます。

      このフィールドは、サーバーログファイルに記載されません。

    • 環境変数 - Pure Storage FlashBlade の認証情報を次の環境変数に指定します。

      export AWS_ACCESS_KEY_ID=<YOUR_ACCESS_KEY_ID>
      export AWS_SECRET_ACCESS_KEY=<YOUR_SECRET_ACCESS_KEY>
      

      server.properties に認証情報の 2 つのプロパティが含まれない場合、ブローカーは前に示した環境変数を使用して、S3 バケットに接続します。

Nutanix Objects

階層型ストレージを Amazon S3 API を通じて Nutanix Objects で有効にするには、次の手順を実行します。

  1. 次のプロパティを server.properties ファイルに追加します。

    confluent.tier.feature=true
    confluent.tier.enable=true
    confluent.tier.backend=S3
    confluent.tier.s3.bucket=<BUCKET_NAME>
    confluent.tier.s3.region=<REGION>
    confluent.tier.s3.aws.endpoint.override=<NUTANIX OBJECTS ENDPOINT>
    # confluent.tier.metadata.replication.factor=1
    

    ちなみに

    階層型ストレージの内部トピックは、デフォルトで、レプリケーション係数が 3 になります。「Confluent クイックスタート」で説明されているような単一のブローカークラスターを実行するために confluent local services start を使用する場合は、上のコードの最終行をコメント解除することにより、階層型ストレージトピックのデフォルトのレプリケーション係数をオーバーライドします。

    上記のプロパティを追加すると、Nutanix Objects で階層型ストレージの各コンポーネントが有効になります。このとき、可能なすべての構成でデフォルトのパラメーターが使用されます。

    • confluent.tier.feature では、ブローカーに対して階層型ストレージを有効にします。これを true にすると、ブローカーで階層型ストレージを活用できます。

    • confluent.tier.enable では、作成されたトピックのデフォルト値を設定します。true に設定すると、未圧縮のトピックがすべて階層型になります。圧縮トピックについては、「既知の制限」を参照してください。

    • confluent.tier.backend は、ブローカーの接続先であるクラウドストレージサービスを示しています。Nutanix Objects では、前に示したように S3 に設定します。

    • BUCKET_NAMEREGION はそれぞれ S3 バケット名とそのリージョンです。ブローカーは階層型データの読み取りと書き込みのために、このバケットとやりとりします。リージョンを有効な AWS リージョンに設定します。リージョンを null または空にすることはできません。たとえば、バケット名 tiered-storage-test-confluentus-east-1 リージョンにあると、次のプロパティになります。

      confluent.tier.s3.bucket=tiered-storage-test-confluent
      confluent.tier.s3.region=us-east-1
      
    • ENDPOINT OVERRIDE は、Nutanix Objects エンドポイントの完全修飾ドメイン名を示します。ntnx-obj がオブジェクトストア名で、nutanix.com がドメイン名である場合は、エンドポイントのオーバーライドは以下のようになります。

      confluent.tier.s3.aws.endpoint.override=http://ntnx-obj.nutanix.com
      
  2. ブローカーでは、Nutanix Objects S3 バケットに接続するために、Nutanix Prism Central の Objects ページから生成される認証情報が必要です。これらの情報は server.properties または環境変数を通じて設定できます。どちらの方法を使用しても問題はありません。ブローカーは server.properties に指定された認証情報を優先的に使用します。ブローカーが server.properties に認証情報を見つけられないと、環境変数を使用します。

    • サーバープロパティ - 次のプロパティを server.properties ファイルに追加します。

      confluent.tier.s3.cred.file.path=<PATH_TO_AWS_CREDENTIALS_FILE>
      

      <PATH_TO_AWS_CREDENTIALS_FILE>Nutanix の認証情報を含むファイル のファイルパスで置き換えます。

      このフィールドは、サーバーログファイルに記載されません。

    • 環境変数 - Nutanix Objects の認証情報を次の環境変数に指定します。

      export AWS_ACCESS_KEY_ID=<NUTANIX_OBJECTS_ACCESS_KEY_ID>
      export AWS_SECRET_ACCESS_KEY=<NUTANIX_OBJECTS_ACCESS_KEY>
      

      server.properties に認証情報の 2 つのプロパティが含まれない場合、ブローカーは前に示した環境変数を使用して、S3 バケットに接続します。

  3. Nutanix Objects バケットで、ブローカーに対して、次のアクションの実行を許可する必要があります。これらの操作は、ブローカーで階層型ストレージを有効にして、適切に使用するために必要です。これらの操作を有効にするには、バケットのユーザーに読み取り/書き込みアクセスを付与します。

    storage.buckets.get
    storage.objects.get
    storage.objects.list
    storage.objects.create
    storage.objects.delete
    storage.objects.update
    

NetApp オブジェクトストレージ

階層型ストレージを Amazon S3 API を通じて NetApp オブジェクトストレージで有効にするには、次の手順を実行します。

  1. 次のプロパティを server.properties ファイルに追加します。

    confluent.tier.feature=true
    confluent.tier.enable=true
    confluent.tier.backend=S3
    confluent.tier.s3.bucket=<BUCKET_NAME>
    confluent.tier.s3.region=<REGION>
    confluent.tier.s3.aws.endpoint.override=<NETAPP OBJECT STORAGE ENDPOINT>
    confluent.tier.s3.force.path.style.access=<BOOLEAN>
    # confluent.tier.metadata.replication.factor=1
    

    ちなみに

    階層型ストレージの内部トピックは、デフォルトで、レプリケーション係数が 3 になります。「Confluent クイックスタート」で説明されているような単一のブローカークラスターを実行するために confluent local services start を使用する場合は、上のコードの最終行をコメント解除することにより、階層型ストレージトピックのデフォルトのレプリケーション係数をオーバーライドします。

    上記のプロパティを追加すると、NetApp オブジェクトストレージで階層型ストレージの各コンポーネントが有効になります。可能なすべての構成でデフォルトパラメーターが使用されます。

    • confluent.tier.feature では、ブローカーに対して階層型ストレージを有効にします。これを true にすると、ブローカーで階層型ストレージを活用できます。

    • confluent.tier.enable では、作成されたトピックのデフォルト値を設定します。true に設定すると、未圧縮のトピックがすべて階層型になります。圧縮トピックについては、「既知の制限」を参照してください。

    • confluent.tier.backend は、ブローカーの接続先であるクラウドストレージサービスを示しています。NetApp オブジェクトストレージでは、前に示したとおり、これを S3 に設定します。

    • BUCKET_NAMEREGION はそれぞれ S3 バケット名とそのリージョンです。ブローカーは階層型データの読み取りと書き込みのために、このバケットとやりとりします。リージョンを有効な AWS リージョンに設定します。リージョンを null または空にすることはできません。たとえば、バケット名 tiered-storage-test-confluentus-east-1 リージョンにあると、次のプロパティになります。

      confluent.tier.s3.bucket=tiered-storage-test-confluent
      confluent.tier.s3.region=us-east-1
      
    • ENDPOINT OVERRIDE は、NetApp オブジェクトストレージのエンドポイントを示します。

    • Confluent.tier.s3.force.path.style.access は、すべてのリクエストでパススタイルのアクセスを使用するようにクライアントを構成します。デフォルトでは、このフラグは有効になっていません。デフォルトの動作では、構成されているエンドポイントとアクセスされるバケットに基づいて、使用するアクセススタイルが検出されます。このフラグを設定すると、すべてのリクエストでパススタイルのアクセスが強制的に使用されます。NetApp では、仮想ホストスタイルとパススタイルの両方がサポートされています。このパラメーターを true に設定すると、パススタイルのアクセスが有効になります。

  2. ブローカーでは、NetApp オブジェクトストレージ S3 バケットに接続するために、NetApp オブジェクトストレージで生成された認証情報が必要です。これらの情報は server.properties または環境変数を通じて設定できます。どちらの方法を使用しても問題はありません。ブローカーは server.properties に指定された認証情報を優先的に使用します。ブローカーが server.properties に認証情報を見つけられないと、環境変数を使用します。

    • サーバープロパティ - 次のプロパティを server.properties ファイルに追加します。

      confluent.tier.s3.cred.file.path=<PATH_TO_AWS_CREDENTIALS_FILE>
      

      <PATH_TO_AWS_CREDENTIALS_FILE>NetApp オブジェクトストレージの認証情報を含むファイル のファイルパスで置き換えます。

      このフィールドは、サーバーログファイルに記載されません。

    • 環境変数 - NetApp オブジェクトストレージの認証情報を次の環境変数に指定します。

      export AWS_ACCESS_KEY_ID=<NETAPP_OBJECT_STORAGE_ACCESS_KEY_ID>
      export AWS_SECRET_ACCESS_KEY=<NETAPP_OBJECT_STORAGE_ACCESS_KEY>
      

      server.properties に認証情報の 2 つのプロパティが含まれない場合、ブローカーは前に示した環境変数を使用して、S3 バケットに接続します。

  3. NetApp オブジェクトストレージバケットで、ブローカーに対して、次のアクションの実行を許可する必要があります。これらの操作は、ブローカーで階層型ストレージを有効にして、適切に使用するために必要です。これらの操作を有効にするには、バケットのユーザーに読み取り/書き込みアクセスを付与します。

    storage.buckets.get
    storage.objects.get
    storage.objects.list
    storage.objects.create
    storage.objects.delete
    storage.objects.update
    

Dell EMC ECS

階層型ストレージを Amazon S3 API を通じて Dell EMC ECS で有効にするには、次の手順を実行します。

  1. 次のプロパティを server.properties ファイルに追加します。

    confluent.tier.feature=true
    confluent.tier.enable=true
    confluent.tier.backend=S3
    confluent.tier.s3.bucket=<BUCKET_NAME>
    confluent.tier.s3.region=<REGION>
    confluent.tier.s3.aws.endpoint.override=<DELL EMC ECS ENDPOINT>
    confluent.tier.s3.force.path.style.access=<BOOLEAN>
    # confluent.tier.metadata.replication.factor=1
    

    ちなみに

    階層型ストレージの内部トピックは、デフォルトで、レプリケーション係数が 3 になります。「Confluent クイックスタート」で説明されているような単一のブローカークラスターを実行するために confluent local services start を使用する場合は、上のコードの最終行をコメント解除することにより、階層型ストレージトピックのデフォルトのレプリケーション係数をオーバーライドします。

    上記のプロパティを追加すると、Dell EMC ECS で階層型ストレージの各コンポーネントが有効になります。このとき、可能なすべての構成でデフォルトのパラメーターが使用されます。

    • confluent.tier.feature では、ブローカーに対して階層型ストレージを有効にします。これを true にすると、ブローカーで階層型ストレージを活用できます。

    • confluent.tier.enable では、作成されたトピックのデフォルト値を設定します。true に設定すると、未圧縮のトピックがすべて階層型になります。圧縮トピックについては、「既知の制限」を参照してください。

    • confluent.tier.backend は、ブローカーの接続先であるクラウドストレージサービスを示しています。Dell EMC ECS では、前に示したように S3 に設定します。

    • BUCKET_NAMEREGION はそれぞれ S3 バケット名とそのリージョンです。ブローカーは階層型データの読み取りと書き込みのために、このバケットとやりとりします。リージョンを有効な AWS リージョンに設定します。リージョンを null または空にすることはできません。たとえば、バケット名 tiered-storage-test-confluentus-east-1 リージョンにあると、次のプロパティになります。

      confluent.tier.s3.bucket=tiered-storage-test-confluent
      confluent.tier.s3.region=us-east-1
      
    • ENDPOINT OVERRIDE は、Dell EMC ECS のエンドポイントを示します。

    • Confluent.tier.s3.force.path.style.access は、すべてのリクエストでパススタイルのアクセスを使用するようにクライアントを構成します。デフォルトでは、このフラグは有効になっていません。デフォルトの動作では、構成されているエンドポイントとアクセスされるバケットに基づいて、使用するアクセススタイルが検出されます。このフラグを設定すると、すべてのリクエストでパススタイルのアクセスが強制的に使用されます。

  2. ブローカーでは、ECS バケットに接続するために、Dell EMC ECS で生成された認証情報が必要です。これらの情報は server.properties または環境変数を通じて設定できます。どちらの方法を使用しても問題はありません。ブローカーは server.properties に指定された認証情報を優先的に使用します。ブローカーが server.properties に認証情報を見つけられないと、環境変数を使用します。

    • サーバープロパティ - 次のプロパティを server.properties ファイルに追加します。

      confluent.tier.s3.cred.file.path=<PATH_TO_AWS_CREDENTIALS_FILE>
      

      <PATH_TO_AWS_CREDENTIALS_FILE>Dell EMC ECS の認証情報を含むファイル のファイルパスで置き換えます。

      このフィールドは、サーバーログファイルに記載されません。

    • 環境変数 - Dell EMC ECS の認証情報を次の環境変数に指定します。

      export AWS_ACCESS_KEY_ID=<DELL_EMC_ECS_ACCESS_KEY_ID>
      export AWS_SECRET_ACCESS_KEY=<DELL_EMC_ECS_ACCESS_KEY>
      

      server.properties に認証情報の 2 つのプロパティが含まれない場合、ブローカーは前に示した環境変数を使用して、S3 バケットに接続します。

  3. Dell EMC ECS バケットで、ブローカーに対して、次のアクションの実行を許可する必要があります。これらの操作は、ブローカーで階層型ストレージを有効にして、適切に使用するために必要です。これらの操作を有効にするには、バケットのユーザーに読み取り/書き込みアクセスを付与します。

    storage.buckets.get
    storage.objects.get
    storage.objects.list
    storage.objects.create
    storage.objects.delete
    storage.objects.update
    

MinIO

階層型ストレージを Amazon S3 API を通じて MinIO で有効にするには、次の手順を実行します。

  1. 次のプロパティを server.properties ファイルに追加します。

    confluent.tier.feature=true
    confluent.tier.enable=true
    confluent.tier.backend=S3
    confluent.tier.s3.bucket=<BUCKET_NAME>
    confluent.tier.s3.region=<REGION>
    confluent.tier.s3.aws.endpoint.override=<MINIO_ENDPOINT>
    confluent.tier.s3.force.path.style.access=true
    # confluent.tier.metadata.replication.factor=1
    

    ちなみに

    階層型ストレージの内部トピックは、デフォルトで、レプリケーション係数が 3 になります。「Confluent クイックスタート」で説明されているような単一のブローカークラスターを実行するために confluent local services start を使用する場合は、上のコードの最終行をコメント解除することにより、階層型ストレージトピックのデフォルトのレプリケーション係数をオーバーライドします。

    上記のプロパティを追加すると、MinIO で階層型ストレージの各コンポーネントが有効になります。このとき、可能なすべての構成でデフォルトのパラメーターが使用されます。

    • confluent.tier.feature では、ブローカーに対して階層型ストレージを有効にします。これを true にすると、ブローカーで階層型ストレージを活用できます。

    • confluent.tier.enable では、作成されたトピックのデフォルト値を設定します。true に設定すると、未圧縮のトピックがすべて階層型になります。圧縮トピックについては、「既知の制限」を参照してください。

    • confluent.tier.backend は、ブローカーの接続先であるクラウドストレージサービスを示しています。MinIO オブジェクトストレージでは、前に示したとおり、これを S3 に設定します。

    • BUCKET_NAMEREGION はそれぞれ S3 バケット名とそのリージョンです。ブローカーは階層型データの読み取りと書き込みのために、このバケットとやりとりします。MinIO では、別のリージョン値で明示的に開始されない限り、REGION のデフォルトは us-east-1 です。MinIO 構成に基づいて、REGION を適切な値に置き換えてください。この値は、mc コマンドラインツールを使用して取得できます。

      mc admin info –json ALIAS | jq ".info.region"
      

      階層型ストレージを構成する前にバケットを作成してください。バケットの作成の詳細については、「mc mb」を参照してください。

      confluent.tier.s3.aws.endpoint.override=http://minio.example.net
      confluent.tier.s3.force.path.style.access=true
      
    • ENDPOINT OVERRIDE は、MinIO オブジェクトストレージエンドポイントの完全修飾ドメイン名を示します。MinIO では、MinIO デプロイ内のすべてのホストがラウンドロビン方式で選択されるように構成されたロードバランサーを使用することを推奨しています。minio がロードバランサーのホスト名で、example.com がドメイン名である場合は、エンドポイントのオーバーライドは以下のようになります。

      confluent.tier.s3.aws.endpoint.override=http://minio.example.net
      
    • confluent.tier.s3.force.path.style.access は、すべてのリクエストでパススタイルのアクセスを使用するようにクライアントを構成します。デフォルトでは、このフラグは有効になっていません。デフォルトの動作では、構成されているエンドポイントとアクセスされるバケットに基づいて、使用するアクセススタイルが検出されます。このフラグを設定すると、すべてのリクエストでパススタイルのアクセスが強制的に使用されます。MinIO デプロイでは、通常、パススタイルのアクセスのみサポートされます。他のアクセススタイルを使用することによるメリットはありません。

  2. ブローカーでは、MinIO バケットに接続するために、MinIO で生成された認証情報が必要です。これらの情報は server.properties または環境変数を通じて設定できます。どちらの方法を使用しても問題はありません。ブローカーは server.properties に指定された認証情報を優先的に使用します。ブローカーが server.properties に認証情報を見つけられないと、環境変数を使用します。

    • サーバープロパティ - 次のプロパティを server.properties ファイルに追加します。

      confluent.tier.s3.cred.file.path=<PATH_TO_MINIO_CREDENTIALS_FILE>
      

      <PATH_TO_MINIO_CREDENTIALS_FILE>MinIO オブジェクトストレージの認証情報を含むファイル のファイルパスで置き換えます。ユーザーに BUCKET_NAME バケットに対する読み取り/書き込み/リストアクセス許可が必要です。

      このフィールドは、サーバーログファイルに記載されません。

    • 環境変数 - MinIO の認証情報を次の環境変数に指定します。

      export AWS_ACCESS_KEY_ID=<MINIO_OBJECT_STORAGE_ACCESS_KEY_ID>
      export AWS_SECRET_ACCESS_KEY=<MINIO_OBJECT_STORAGE_ACCESS_KEY>
      

      server.properties に認証情報の 2 つのプロパティが含まれない場合、ブローカーは前に示した環境変数を使用して、S3 バケットに接続します。

  3. MinIO 認証情報で、ブローカーに対して、以下のアクションの実行を許可する必要があります。これらの操作は、ブローカーで階層型ストレージを有効にして、適切に使用するために必要です。これらの操作を有効にするには、バケットのユーザーに読み取り/書き込みアクセスを付与します。

    {
      “PolicyName”: “ConfluentTieredStorage”,
      “Policy”: {
        “Version”: “2012-10-17”,
        “Statement”: [
          {
            “Effect”: “Allow”,
            “Action”: [
              “s3:DeleteObject”,
              “s3:GetObject”,
              “s3:PutObject”,
              “s3:GetBucketLocation”
            ],
            “Resource”: [
              “arn:aws:s3:::BUCKET_NAME”,
              “arn:aws:s3:::BUCKET_NAME/*”
            ]
          }
        ]
      }
    }
    

    このポリシーをユーザー認証情報に割り当てるには、mc admin policy set を使用します。

階層型ストレージでのトピックの作成

Kafka CLI コマンド kafka-topics$CONFLUENT_HOME/bin 内)を使用してトピックを作成できます。このコマンドは前のバージョンと同じ方法で使用できます。階層型ストレージに関連するトピックの構成のサポートが追加されています。

kafka-topics --bootstrap-server localhost:9092   \
  --create --topic trades \
  --partitions 6 \
  --replication-factor 3 \
  --config confluent.tier.enable=true \
  --config confluent.tier.local.hotset.ms=3600000 \
  --config retention.ms=604800000
  • confluent.tier.local.hotset.ms - アクティブではないセグメントを、スペースを解放するために破棄されるまでブローカーローカルストレージに保持する最大時間を制御します。ローカルディスクから削除するセグメントは、オブジェクトストレージに存在し、保持ポリシーに応じて引き続き利用可能になります。-1 に設定されている場合、時間制限は適用されません。
  • retention.ms は階層型トピックとそれ以外のトピックで同様に機能します。ただし、保持ポリシーに応じて、オブジェクトストレージとローカルストレージの両方でセグメントが失効します。

ちなみに

  • Control Center からトピックを構成することもできます。これを行うには、Control Center にログオンし、トピックに移動します(<環境> -> <クラスター> -> <トピック>)。詳細については、Control Center ユーザーガイドの「階層型ストレージ」を参照してください。ローカルデプロイの場合、Control Center を使用するには、ウェブブラウザーで http://localhost:9021/ にアクセスします。
  • Kafka のコマンドの使用の詳細については、「Confluent Platform における Kafka の基礎」の「Kafka のコマンド入門」を参照してください。

データストレージを試すためのテストメッセージの送信

前のセクションで作成したトピックをテストケースとして使用することも、新しいトピックを作成することもできます。この例の目的でデータがストレージに転送される速度を上げるには、以下に示すように、トピックの segment.bytes 設定をデフォルト値から更新します。これを行うには、既存トピックの構成を、そのトピックの Control Center エキスパートモード 設定から更新するか、下に示すように新しいトピックを作成します。

kafka-topics --bootstrap-server localhost:9092   \
  --create --topic hot-topic \
  --partitions 6 \
  --replication-factor 3 \
  --config confluent.tier.enable=true \
  --config confluent.tier.local.hotset.ms=3600000 \
  --config retention.ms=604800000 \
  --config segment.bytes=10485760

階層型ストレージを構成したら、テストデータを 1 つ以上のトピックに送信し、コンシューマーを実行してメッセージを読み取ってみます。たとえば、以下のコマンドを使用してメッセージを生成します。

kafka-producer-perf-test \
   --producer-props bootstrap.servers=localhost:9092 \
   --topic hot-topic \
   --record-size 1000 \
   --throughput 1000 \
   --num-records 3600000

これを 5 分から 10 分程度実行してから、コンシューマーを実行します。次に例を示します。

kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic hot-topic

10 分から 20 分後、Control Center の UI で、ストレージコンテナーに保存されたデータを確認できます。ローカルデプロイの場合、Control Center を使用するには、ウェブブラウザーで http://localhost:9021/ にアクセスします。

Control Center を使用して階層型ストレージを構成および管理する方法の詳細については、「階層型ストレージ」を参照してください。

ベストプラクティスと推奨事項

チューニング

階層型ストレージのパフォーマンスを向上させるには、TierFetcherNumThreadsTierArchiverNumThreads の値を増やします。一般的なガイドラインとして、TierFetcherNumThreads を CPU の物理コア数にまで増やし、TierArchiverNumThreads を CPU コア数の半分に設定します。たとえば 8 つの物理コアのマシンがある場合、server.properties で confluent.tier.fetcher.num.threads = 8 と confluent.tier.archiver.num.threads = 4 を設定します。

トピック削除の間隔

トピックが削除されるとき、オブジェクトストレージのログセグメントファイルの削除はすぐには始まりません。このファイルの削除は、一定時間が経過してから実行されます。この時間のデフォルト値は 3 時間です。この間隔の値を変更するには、構成 confluent.tier.topic.delete.check.interval.ms を変更します。トピックやクラスターを削除するときは、この点に注意することが重要です。トピックやクラスターが削除されたら、個別のバケットにあるオブジェクトを手動で削除しても問題ありません。

ログセグメントのサイズ

階層化が有効であるトピックに対して、セグメントサイズ構成 log.segment.bytes をデフォルトサイズ 1 GB から減らすことを推奨します。アーカイバーは、オブジェクトストレージへのセグメントのアップロードを試行する前に、ログセグメントが閉じるのを待機します。100 MB など小さなセグメントサイズを使用すると、セグメントのクローズの間隔が短くなります。またセグメントサイズを小さくすると、ページキャッシュの動作が促進され、アーカイバーのパフォーマンスが高まります。

階層型ストレージの内部トピックに対する ACL

オンプレミスデプロイで推奨されるベストプラクティスは、階層型ストレージに使用される内部トピックに対して ACL オーソライザー を有効にすることです。ACL ルールを設定して、このデータへのアクセスをブローカーユーザーのみに制限します。これにより、内部トピックが保護され、階層型ストレージのデータとメタデータへの不正アクセスが防止されます。

たとえば、次のコマンドでは、階層型ストレージの内部トピック _confluent-tier-state に ACL を設定しています(現在、階層型ストレージに関連する社内トピックは 1 つだけです)。この例では、内部トピックに対するすべての 操作 にプリンシパル kafka のアクセス許可を提供する ACL を作成しています。

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
--add --allow-principal User:<kafka> --operation All --topic "_confluent-tier-state"

ちなみに

実際の使用では、User:<kafka> を、デプロイの実際のブローカープリンシパルに置き換えてください。

階層型ストレージの内部トピック(将来のリリースで他のトピックが追加される場合)に ACL を設定するには、プレフィックスとして _confluent-tier を使用してトピック名を指定します。たとえば、次のコマンドでは、階層型ストレージに関連するすべての内部トピックに ACL を設定しています。

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
--add --allow-principal User:<kafka> --operation All --topic "_confluent-tier-" --resource-pattern-type PREFIXED

階層型ストレージでのブローカーのサイズ設定

階層型ストレージが有効である場合、confluent.tier.local.hotset.ms は、セグメントが、スペースを解放するために破棄されるまでブローカーローカルストレージに保持される時間を制御します。この設定は最終的にはビジネスの観点から決定しますが、サイズ設定に役に立ついくつかのプラクティスがあります。オブジェクトストレージからデータをフェッチしている間にコンシューマーで遅延が確認される場合、一般には confluent.tier.local.hotset.ms の値を増やすことが得策です。ブローカーでディスクサイズを計画するとき、同様に重要なのは、オブジェクトストレージとの通信時に発生する可能性がある問題に対応できるように、ある程度の余裕をもった値を設定しておくことです。クラウドオブジェクトストレージの停止はきわめてまれですが、発生する可能性はあり、階層型ストレージでは、オブジェクトストレージに正常に階層化されるまでブローカーにセグメントを引き続き保持します。

階層型アーカイバーのメトリクス

アーカイバーは階層型ストレージのコンポーネントで、アクティブではないセグメントをクラウドストレージにアップロードする処理を担当します。

kafka.tier.tasks.archive:type=TierArchiver,name=BytesPerSec
アーカイバーでクラウドストレージにアップロードしている、1 秒あたりのバイト数(レート)。
kafka.tier.tasks.archive:type=TierArchiver,name=TotalLag
アーカイバーでクラウドストレージにアップロードしていない、非アクティブセグメントのバイト数。アーカイバーはクラウドストレージに定常的にアップロードしているので、全体のラグは 0 に向かって減ります。
kafka.tier.tasks.archive:type=TierArchiver,name=RetriesPerSec
非アクティブセグメントをクラウドストレージにアップロードするために、アーカイバーが再試行する回数。
kafka.tier.tasks:type=TierTasks,name=NumPartitionsInError
エラーステートで、アーカイバーがアップロードできなかったパーティション数。このステートのパーティションは、アーカイバーでスキップされ、クラウドストレージにアップロードされます。

階層型フェッチャーのメトリクス

フェッチャーは階層型ストレージのコンポーネントで、クラウドストレージからのデータの取得を担当します。

kafka.server:type=TierFetcher
フェッチャーがクラウドストレージからデータを取得する 1 秒あたりのバイト数(レート)。

パフォーマンステストの例

想定されるメトリクスのリファレンスとして、アクティブ化された階層型ストレージの機能で、Kafka クラスターのスループットパフォーマンスを記録しました。クラスターと環境に関する概要情報および構成を以下に示します。

クラスターの構成

  • 6 個のブローカー
  • r5.xlarge AWS インスタンス
  • GP2 ディスク(2 TB)
  • 104857600(100 MB)segment.bytes
  • 単一トピック(24 パーティション、レプリケーション係数 3)

プロデューサーとコンシューマーの詳細情報

  • 6 個のプロデューサー(各ターゲットレートが 20 MB/秒、合計 120 MB/秒)
  • 合計 12 個のコンシューマー
  • 6 個のコンシューマー(トピックの最後からターゲットレート 20 MB/秒 でレコードを読み取り)
  • 6 個のコンシューマー(S3 のトピックの先頭からフェッチャーの上限レートなしで、レコードを読み取り)

プロデューサー、コンシューマー、ブローカー、およびトピックの構成に応じて、値は変わることがあります。たとえば、ログセグメントサイズに 100 MB の代わりに 1 GB を使用すると、アーカイバーのスループットが低下することがあります。

スループットのメトリック 値(MB/秒)
プロデューサー(クラスターの合計) 115
プロデューサー(ブローカーでの平均) 19
コンシューマー(クラスターの合計) 623
アーカイバー(ブローカーでの平均) 19
フェッチャー(ブローカーでの平均) 104

サポートされているプラットフォームと機能

既知の制限

  • Confluent Platform 6.0.0 の時点では、階層型ストレージは一度有効にすると無効にできません。
  • 階層型ストレージで Amazon S3 API を使用するとき、認定オブジェクトストア以外はサポートされません。現在は Amazon S3、Google GCS、Pure Storage FlashBlade、Nutanix Objects、NetApp オブジェクトストレージ、Dell EMC ECS、および MinIO のみがサポートされています。
  • 圧縮トピックは階層型ストレージでまだサポートされていません。
  • 現時点で、Azure Blob Storage は階層型ストレージでサポートされていません。
  • JBOD(単なるディスクの束)はサポートされません。階層型ストレージでは現在、複数のログディレクトリ(JBOD の本質的な要件)をサポートしないためです。

階層型ストレージの無効化

Confluent Platform 6.0.1 以降では、confluent.tier.enable=false を設定することで階層型ストレージを無効にできます。これにより、追加の階層化が無効になります。オブジェクトストレージにまだオフロードされていない新規および既存のデータはオフロードされません。既に階層化(オフロード)されているデータは、オブジェクトストレージ(階層型)に残ります。

階層型ストレージを無効にする場合は、Tier fetcher などの階層化関連コンポーネントを有効にしたまま、既に階層化済みのデータをフェッチできるように、confluent.tier.feature が有効のまま(confluent.tier.feature=true)であることを確認します。