重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Confluent Cloud クライアントの構成

概要

Confluent Cloud に接続する Kafka クライアントアプリケーションは、サポートされている任意の言語で作成できます。必要なことは、Confluent Cloud クラスターの認証情報を使用してクライアントを構成することだけです。

Confluent の公式クライアントは以下に対応しています。

このドキュメントでは、クライアントを適切に構成する方法について説明します。既にクライアントを構成済みで、クライアントアプリケーションの作成例をお探しの場合は、"Hello, World!" の:platform:コード例|tutorials/examples/clients/docs/clients-all-examples.html を参照してください。このコード例では、Confluent Cloud クラスターを含む任意の Kafka クラスターに対する生成と消費を行います。

Kafka クライアント全般に関する詳細については、「Kafka クライアント」を参照してください。

注釈

Confluent Cloud に接続するすべてのクライアントは、SASL_PLAIN 認証および TLS 1.2 暗号化をサポートしている必要があります。

警告

接続エラーが発生する可能性があるため、Confluent では証明書のピン留めは避けることをお勧めします。Confluent は証明書やそのプロパティを予告なしに変更することがあります。そのため、ピン留めされている証明書が置き換えられると、アプリケーションが接続に失敗する可能性があります。

組織でピン留めが必要な場合は、下位の証明書ではなく、Let's Encrypt の ISRG Root X1 証明書 をピン留めしてください。ただし、Confluent が CA プロバイダーを変更した場合には、アプリケーションへの接続に影響することがあります。

クライアントの構成

クライアントは、Confluent Cloud クラスターに接続するように適切に構成する必要があります。一部のクライアント言語では、クライアントが Confluent Cloud Schema Registry クラスターに接続するように構成することもできます。

Confluent Cloud Console から、または Confluent CLI で、必要な認証情報があらかじめ指定された構成ファイルを取得できます。

Confluent Cloud Console からのクライアントの構成

クライアントアプリを Confluent Cloud に接続する最も簡単な方法は、Confluent Cloud Console から構成ファイルをコピーアンドペーストすることです。

  1. Confluent Cloud にログインします。

  2. 環境を選択します。

  3. クラスターを選択します。

  4. ナビゲーションメニューから Data integration を選択し、次に Clients を選択します。

  5. (省略可)**+ New client** ボタンをクリックします。

  6. クライアントアプリに使用する言語を選択し、コード例をアプリケーションのソースコードにコピーアンドペーストします。

    ../_images/cloud-client-languages.png
  7. 言語を選択したら、必要に応じて Kafka クラスターの API キーと スキーマレジストリ のキーを作成するか、既存のキーを使用します。続けて、表示された構成をコピーして、クライアントアプリケーションのソースコードに貼り付けます。

    ../_images/cloud-client-configuration-example.png

    ちなみに

    初めての場合は、表示された構成のすぐ下にある Get started with our example project and tutorial をクリックできます。これにより、クライアント構成だけでなく、Confluent Cloud クラスターに対する生成と消費を行う "Hello, World!" クライアントコード例に関する説明も確認できます。

Confluent CLI でのクライアントの構成

Confluent CLI を頻繁に使用している場合は、CLI でコンテキストをセットアップしたら、1 行のコマンド confluent kafka client-config create を使用して、クライアントアプリを Confluent Cloud に接続するための構成ファイルを作成できます。

以下の表に、サポートされているクライアント言語、対応する言語 ID、各言語が Confluent Cloud Schema Registry 構成をサポートしているかどうかを示します。Confluent Cloud Schema Registry 構成をサポートする言語の場合、フラグを介して スキーマレジストリ 情報をコマンドに渡すことでクライアントアプリ用に構成することもできます。

言語 言語 ID Confluent Cloud Schema Registry のサポート状況
Clojure clojure ×
C/C++ cpp ×
C# csharp ×
Go go ×
Groovy groovy ×
Java java
Kotlin kotlin ×
Ktor ktor
Node.js nodejs ×
Python python
REST API restapi
Ruby ruby ×
Rust rust ×
Scala scala ×
Spring Boot sprintboot

ちなみに

既に CLI コンテキストをセットアップしている場合は、クライアント構成ファイルを作成する 手順までスキップできます。

前提条件:
  1. クラスター URL を指定した confluent login コマンドを使用して、クラスターにログインします。

    confluent login
    
    Enter your Confluent Cloud credentials:
    Email: susan@myemail.com
    Password:
    
  2. Confluent Cloud 環境 を設定します。

    1. 環境 ID を取得します。

      confluent environment list
      

      出力は以下のようになります。

            Id      |        Name
      +-------------+--------------------+
        * t2703     | default
          env-m2561 | demo-env-102893
          env-vnywz | ccloud-demo
          env-qzrg2 | data-lineage-demo
          env-250o2 | my-new-environment
      
    2. ID(<env-id>)を使用して環境を設定します。

      confluent environment use <env-id>
      

      出力は以下のようになります。

      Now using "env-vnywz" as the default (active) environment.
      
  3. 使用するクラスターを設定します。

    1. クラスター ID を取得します。

      confluent kafka cluster list
      

      出力は以下のようになります。

            Id      |   Name    | Type  | Provider |  Region  | Availability | Status
      +-------------+-----------+-------+----------+----------+--------------+--------+
          lkc-oymmj | cluster_1 | BASIC | gcp      | us-east4 | single-zone  | UP
        * lkc-7k6kj | cluster_0 | BASIC | gcp      | us-east1 | single-zone  | UP
      
    2. ID(<cluster-id>)を使用してクラスターを設定します。これが、コマンドが実行されるクラスターになります。

      confluent kafka cluster use <cluster-id>
      

      選択したクラスターを設定後に確認するには、再度 confluent kafka cluster list と入力します。選択されているクラスターには、横にアスタリスク(*)が表示されます。

  4. API キーとシークレットを作成し、保存します。

    API キーは、Confluent CLI で、または Confluent Cloud Console から生成できます。API キーとシークレットは必ず保存してください。

    1. 以下のコマンドを実行して、API キーとシークレットを ID(<cluster-id>)を使用して作成します。

      confluent api-key create --resource <cluster-id>
      

      出力は以下のようになります。

      It may take a couple of minutes for the API key to be ready.
      Save the API key and secret. The secret is not retrievable later.
      +---------+------------------------------------------------------------------+
      | API Key | ABC123xyz                                                        |
      | Secret  | 123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABCx |
      +---------+------------------------------------------------------------------+
      

    詳しくは、「リソース別 API キー」を参照してください。

  5. Confluent CLI コマンドに使用する API キーを、ID(<cluster-id>)を使用して設定します。

    confluent api-key use <api-key> --resource <cluster-id>
    
  6. (省略可)|sr-ccloud| を有効にして、スキーマレジストリ の API キーとシークレットを作成します。

    重要

    スキーマレジストリ 構成をサポートしない言語のクライアント構成ファイルを作成する場合は、スキーマレジストリ を有効にする必要はありません。それ以外の場合は、スキーマレジストリ を有効にして構成することをお勧めします(ただし、必須ではありません)。

    選択した言語が スキーマレジストリ 構成をサポートしているかどうかについては、クライアント言語の表 で確認してください。

    Confluent Cloud Schema Registry を有効にして、Confluent CLI で、または Confluent Cloud Console から API キーとシークレットを作成できます。API キーとシークレットは必ず保存してください。

    1. 任意のクラウドプロバイダーと地域で スキーマレジストリ を有効にします。

      confluent schema-registry cluster enable --cloud <cloud-provider> --geo <geography>
      

      出力は以下のようになります。

      +--------------+--------------------------------------------------+
      | Id           | lsrc-zxvj3                                       |
      | Endpoint URL | https://psrc-vrpp5.us-east-2.gcp.confluent.cloud |
      +--------------+--------------------------------------------------+
      
    2. スキーマレジストリ の API キーとシークレットを、ID(<sr-id>)を使用して作成します。

      confluent api-key create --resource <sr-id>
      

      出力は以下のようになります。

      It may take a couple of minutes for the API key to be ready.
      Save the API key and secret. The secret is not retrievable later.
      +---------+------------------------------------------------------------------+
      | API Key | ABC123xyz                                                        |
      | Secret  | 123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABCx |
      +---------+------------------------------------------------------------------+
      

    スキーマレジストリ の有効化の詳細については、「confluent schema-registry cluster enable」を参照してください。API キーとシークレットの作成の詳細については、「リソース別 API キー」を参照してください。

  7. 選択した言語のクライアント構成ファイルを、言語 ID(<language-id>)を使用して作成します。続けて、表示された構成をコピーして、クライアントアプリケーションのソースコードに貼り付けます。

    言語 ID のリストと各言語が スキーマレジストリ 構成をサポートしているかどうかについては、クライアント言語の表 で確認してください。

    ちなみに

    コマンド confluent kafka client-config create <language-id> の出力では、クライアント構成ファイルが stdout に書き出され、エラーまたは警告が stderr に書き出されます。

    次のコマンドを実行すると、コマンド出力を別々のファイルにリダイレクトできます。

    confluent kafka client-config create <language-id> 1> config-file.config 2> errors-warnings-file.err
    

    ちなみに

    CLI コンテキストを使用しない場合は、フラグを介して必要な情報すべてを渡すことができます。

    confluent kafka client-config create <language-id> \
      --environment <env-id> \
      --cluster <cluster-id> \
      --api-key <api-key> --api-secret <api-secret> \
      --sr-apikey <sr-apikey> --sr-apisecret <sr-apisecret>   # only for languages that support Schema Registry configuration
    
    • スキーマレジストリ 構成をサポートしない言語の場合、次のコマンドを実行します。

      confluent kafka client-config create <language-id>
      
    • スキーマレジストリ 構成をサポートする言語の場合、次のコマンドを実行します。

      confluent kafka client-config create <language-id> \
        --sr-apikey <sr-apikey> \
        --sr-apisecret <sr-apisecret>
      

    詳細については、「confluent kafka client-config create」を参照してください。

クラスターのローリングに対応したクライアントの構成

Confluent Cloud では、アップグレードとメンテナンス のために、すべてのクラスターを定期的にローリングしています。クラスターのローリング とは、アップデート処理の間、クラスターの可用性と機能性を完全に維持するために、そのクラスターを構成するすべてのブローカーを一度に 1 つずつアップデートしていくことです。Kafka プロトコルおよびアーキテクチャは、まさにこのタイプの、高可用性でフォールトトレラントな動作を実現するように設計されているので、適切に構成されたクライアントは、ローリング時に発生するブローカーの変更に正常に対応することができます。

クラスターのローリング時には、クライアントで以下のような再試行可能な例外が発生することがあります。適切に構成されたクライアントでは、このような例外が発生すると、警告が生成されます。

UNKNOWN_TOPIC_OR_PARTITION: "This server does not host this topic-partition."
LEADER_NOT_AVAILABLE: "There is no leader for this topic-partition as we are in the middle of a leadership election."
NOT_LEADER_FOR_PARTITION: "This server is not the leader for that topic-partition."
NOT_ENOUGH_REPLICAS: "Messages are rejected since there are fewer in-sync replicas than required."
NOT_ENOUGH_REPLICAS_AFTER_APPEND: "Messages are written to the log, but to fewer in-sync replicas than required."

デフォルトでは、Kafka プロデューサークライアントは、2 分間再試行を行い、これらの警告をログに出力してから、介入なしでリカバリします。コンシューマーおよび管理クライアントは、デフォルトでは 1 分間再試行します。

クライアントに構成した再試行回数または再試行時間が十分でない場合は、上記の例外がエラーとしてログに記録されます。

再試行中にクライアントでメモリーのバッファ容量不足が発生し、クライアントがメモリーを待機して処理をブロックしている間に再試行時間も経過してしまうと、タイムアウト例外が発生します。

推奨事項

上記の再試行可能な警告に対して内部アラートをトリガーすることはお勧めしません。これらの警告は、正常な操作の中でも定期的に発生するものであり、適切に構成されたクライアントであれば、ストリーミングアプリケーションに中断を生じさせることなく正常に処理できるものであるからです。代わりに、アラートは、自動的に再試行できないクライアントエラーだけに制限することをお勧めします。

Confluent Cloud の Kafka アプリケーションの設計、モニタリング、最適化に関するその他の推奨事項については、「Confluent Cloud でのクライアントアプリケーションの開発」を参照してください。