重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

ksqlDB を Confluent Cloud に接続する

Confluent Cloud の Apache Kafka® クラスターに ksqlDB を接続できます。

Confluent Cloud を使用するために ksqlDB サーバーを構成する必要があります。ksqlDB CLI の構成は不要です。

前提条件

  1. Confluent CLI を使用して Confluent Cloud クラスターにログインし、confluent kafka cluster list コマンドを実行して Kafka クラスター ID を取得します。

    confluent kafka cluster list
    

    出力は以下のようになります。

          Id      |       Name        |     Type     | Provider |  Region  | Availability | Status
    +-------------+-------------------+--------------+----------+----------+--------------+--------+
        lkc-a123b | ksqldb-quickstart | BASIC_LEGACY | gcp      | us-west2 | multi-zone   | UP
    
  2. confluent kafka cluster describe コマンドを実行して Confluent Cloud クラスターのエンドポイントを取得します。

    confluent kafka cluster describe lkc-a123b
    

    出力は以下のようになります。

     +--------------+--------------------------------------------------------+
     | Id           | lkc-a123b                                              |
     | Name         | ksqldb-quickstart                                      |
     | Type         | BASIC                                                  |
     | Ingress      |                                                    100 |
     | Egress       |                                                    100 |
     | Storage      |                                                   5000 |
     | Provider     | azure                                                  |
     | Availability | single-zone                                            |
     | Region       | us-west2                                               |
     | Status       | UP                                                     |
     | Endpoint     | SASL_SSL://pkc-4s987.us-west2.gcp.confluent.cloud:9092 |
     | ApiEndpoint  | https://pkac-42kz6.us-west2.gcp.confluent.cloud        |
     +--------------+--------------------------------------------------------+
    

    Endpoint の値を保存して、後の手順でこの値を使用します。

  3. my-ksqldb-app という名前のサービスアカウントを作成します。説明を含める必要があります。

    confluent iam service-account create my-ksqldb-app --description "My ksqlDB API and secrets service account."
    

    出力は以下のようになります。

    +-------------+--------------------------------+
    | Id          |                         123456 |
    | Resource ID | sa-efg123                      |
    | Name        | my-ksqldb-app                  |
    | Description | My ksqlDB API and secrets      |
    |             | service account.               |
    +-------------+--------------------------------+
    

    サービスアカウント ID を保存して、後の手順でこの値を使用します。

  4. Kafka API キーとシークレットを、サービスアカウント 123456 に対して作成します。必ず、ここに示しているサービスアカウント ID と Kafka クラスター ID の値を実際の値に置き換えてください。

    confluent api-key create --service-account 123456 --resource lkc-a123b
    

    出力は以下のようになります。

    It may take a couple of minutes for the API key to be ready.
    Save the API key and secret. The secret is not retrievable later.
    +---------+------------------------------------------------------------------+
    | API Key | ABCXQHYDZXMMUDEF                                                 |
    | Secret  | aBCde3s54+4Xv36YKPLDKy2aklGr6x/ShUrEX5D1Te4AzRlphFlr6eghmPX81HTF |
    +---------+------------------------------------------------------------------+
    

    重要

    API キーとシークレットを保存します。 クライアントアプリケーションを構成するときに、この情報が必要になります。キーとシークレットにアクセスして表示できるのは、このとき だけ であることに注意してください。

  5. /etc/ksqldb/ksql-server.properties プロパティファイルをカスタマイズします。

    ちなみに

    Confluent Cloud で ksqlDB を使用するには ksqlDB サーバーを構成する必要があります。ksqlDB CLI には追加の構成は不要です。

    Confluent Cloud で ksqlDB を使用するために必要な最小構成の例を以下に示します。推奨されている ksqlDB の本稼働環境の設定 についても確認しておく必要があります。前に生成した API キーおよびシークレットで、<api-key> および <api-secret> を置き換えます。

    # For bootstrap.servers, assign the Endpoint value from the "confluent kafka cluster describe" command.
    # eg. pkc-4s087.us-west2.gcp.confluent.cloud:9092
    bootstrap.servers=<broker-endpoint>
    ksql.internal.topic.replicas=3
    ksql.streams.replication.factor=3
    ksql.logging.processing.topic.replication.factor=3
    listeners=http://0.0.0.0:8088
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    # Replace <api-key> and <api-secret> with your API key and secret.
    sasl.jaas.config=\
        org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="<api-key>" \
        password="<api-secret>";
    
  6. (オプション)GitHub の ccloud/examples/template_delta_configs にある ksql-server-ccloud.delta の例のとおり Confluent Cloud スキーマレジストリ の構成を追加します。

    # Confluent Schema Registry configuration for ksqlDB Server
    ksql.schema.registry.basic.auth.credentials.source=USER_INFO
    ksql.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
    ksql.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
    
  7. ksqlDB サーバーを再起動します。再起動の方法は 環境によって異なります

詳細については、「Quick Start for Confluent Cloud」と『ksqlDB 構成パラメーターリファレンス』を参照してください。

ksqlDB から Confluent Cloud にアクセスするための ACL の作成

Confluent Cloud の Kafka クラスターで ACL を有効にした場合は、Kafka クラスターの特定のリソースへのアクセス権限を ksqlDB アプリケーションに付与する必要があります。以下の Confluent CLI コマンドを使用して、指定したトピックの操作を ksqlDB に許可するために必要な ACL を Kafka クラスターで作成します。

UI を使用して ksqlDB をプロビジョニングする場合は、confluent ksql app configure-acls コマンドを実行する必要はありません。

ちなみに

--dry-run オプションを使用して、このコマンドで設定されるすべての ACL のプレビューを表示します(実際の設定は行いません)。

以下のコマンドを実行して、Confluent Cloud で稼働している Kafka クラスターへのアクセス権限を ksqlDB クラスターに与えます。

confluent ksql app configure-acls <ksql-cluster-id>

ksqlDB から Confluent Cloud の特定のトピックにアクセスするための ACL の作成

ksqlDB クラスターを起動して Confluent Cloud と通信するための ACL を割り当てるだけでなく、特定のトピックへのアクセスを ksqlDB ユーザーに許可するための ACL も指定する必要があります。

以下のコマンドで、<id> を以前に作成したサービスアカウント ID に置き換えます。

SELECT FROM STREAM/TABLE を実行するための ACL の割り当て

以下のコマンドを実行して、ストリームやテーブルの基礎になるトピックに対する SELECT FROM STREAM/TABLE ステートメントの読み取りアクセス権限を有効にします。

confluent kafka acl create --allow --service-account <id> --operation READ --topic <topic>

トピックに書き込みを行うための ACL の割り当て

以下のコマンドを実行して、トピックへの書き込みアクセス権限を有効にします。

confluent kafka acl create --allow --service-account <id> --operation WRITE --topic <topic>

トピックに書き込みを行うための ACL の割り当て

CREATE STREAM、CREATE STREAM AS SELECT、CREATE TABLE、CREATE TABLE AS SELECT など、Kafka トピックを作成する ksqlDB ステートメントを記述する場合、トピックの読み取りまたは書き込みのアクセス権限に加え、トピックを作成するアクセス権限が ksqlDB に必要です。

CREATE STREAM ステートメント(例: CREATE STREAM FOO (...)WITH (KAFKA_TOPIC='FOO', ...);)の CREATE および READ のアクセス権限を付与するには、次のコマンドを実行します。CREATE TABLE 用のコマンドも同様です。

confluent kafka acl create --allow --service-account <id> --operation CREATE --topic 'FOO'
confluent kafka acl create --allow --service-account <id> --operation READ --topic 'FOO'
confluent kafka acl create --allow --service-account <id> --operation CREATE --cluster-scope

CREATE STREAM AS SELECT ステートメント(例: CREATE STREAM BAR WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM FOO;)の CREATE および WRITE のアクセス権限を付与するには、次のコマンドを実行します。CREATE TABLE AS SELECT 用のコマンドも同様です。

confluent kafka acl create --allow --service-account <id> --operation CREATE --topic 'BAR'
confluent kafka acl create --allow --service-account <id> --operation WRITE --topic 'BAR'
confluent kafka acl create --allow --service-account <id> --operation CREATE --cluster-scope

すべてのトピックへのフルアクセス権限を与える ACL の割り当て

以下のコマンドを実行して、すべてのトピックへのフルアクセス権限を有効にします。

confluent kafka acl create --allow --service-account <id> --operation READ --operation WRITE --topic '*'

プレフィックスの付いたトピックへのフルアクセス権限を与える ACL の割り当て

以下のコマンドを実行して、指定したプレフィックスで始まる名前を持つすべてのトピックへのフルアクセス権限を有効にします。

confluent kafka acl create --allow --service-account <id> --operation READ --operation WRITE --topic 'prefix' --prefix