重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Confluent CLI を使用した ksqlDB の管理

ksqlDB アプリケーションの作成、Kafka トピックの作成、クラスターリソースへのアクセスの制御には、Cloud Console だけでなく、Confluent CLI を使用できます。

Confluent Cloud CLI をインストールする

Confluent Cloud で Kafka クラスターが稼働するようになったら、Confluent CLI を使用してローカルコンピューターからクラスターを操作できます。たとえば、Confluent CLI を使用して、トピックへのデータの生成や、トピックからのデータの消費ができます。

CLI のインストールガイド の説明に従って Confluent CLI をインストールしてください。

ちなみに

さまざまな言語を使用して Kafka クライアントアプリケーションを Confluent Cloud に接続する例については、「コード例」を参照してください。

クラスターにログインします。

以下の手順では、Kafka クラスターにログインし、Confluent CLI を使用して作成した API キーを使用して、クラスターに接続します。

Confluent CLI コマンドの詳細については、「Confluent CLI コマンドリファレンス」を参照してください。

  1. Confluent Cloud クラスターにログインします。

    confluent login
    

    出力は以下のようになります。

    Enter your Confluent credentials:
    Email: jdoe@myemail.io
    Password: ***********************
    
    Logged in as "jdoe@myemail.io"
    Using environment "t118" ("default")
    
  2. クラスターを表示します。

    confluent kafka cluster list
    

    出力は以下のようになります。

          Id      |       Name        |     Type     | Provider |  Region  | Availability | Status
    +-------------+-------------------+--------------+----------+----------+--------------+--------+
        lkc-emmox | ccloud-quickstart | BASIC_LEGACY | gcp      | us-west2 | HIGH         | UP
    
  3. アクティブ Kafka クラスターを設定します。この例では、クラスター ID は lkc-emmox です。

    confluent kafka cluster use lkc-emmox
    

    ちなみに

    クラスター ID に含まれている lkc というプレフィックスは、「logical Kafka cluster」の頭字語です。

Kafka API キーとシークレットを作成する

  1. 次のコマンドを実行して、API キーとシークレットを作成します。CLI を使用して API キーを作成すると、自動的にローカルに保管されます。

    confluent api-key create --resource "cloud"
    
  2. confluent api-key use <key> コマンドを実行して、Confluent CLI コマンドで使用する API キーを設定します。

    confluent api-key use --resource <resource-id> <api-key>
    

    lkc-emmox という ID の前述のクラスターの API キーとシークレットのペアの場合、以下のようなコマンドになります。

    confluent api-key use --resource lkc-emmox LD35EN2ZJTCTRQRM
    

Confluent Cloud CLI を使用した ksqlDB アプリケーションの作成

クラウドでホストされている Kafka クラスター内に新しい ksqlDB アプリケーションを作成するには、ksql app create コマンドを使用します。

次のコマンドを実行すると、キーと関連付けられているユーザーアカウントまたはサービスアカウントと同じリソースへのアクセス権限を持つ、新しい ksqlDB アプリケーションが作成されます。

confluent ksql app create <app-name> --api-key <api-key> --api-secret <secret>

Kafka API キーを指定しない場合、Confluent Cloud によりアプリケーション用のサービスアカウントと新しいアカウントに対応する Kafka API キーが作成されます。

Confluent CLI を使用してトピックを作成する

  1. users という名前のトピックを作成します。

    confluent kafka topic create users
    

    confluent kafka topic list からの出力は以下のようになります。

         Name
    +------------+
        users
    

詳細については、confluent kafka topic のコマンドリファレンス を参照してください。

Confluent Cloud で ksqlDB のアクセスを有効にする

Confluent Cloud CLI を使用して、アプリケーションへのアクセスを有効にするための ACL を設定 できます。

  1. ksqlDB 用の Confluent Cloud ACL を構成します。Confluent Cloud ACL の詳細については、「Confluent Cloud のアクセス制御リスト(ACL)の使用」を参照してください。

    1. Confluent Cloud CLI で ksqlDB アプリケーション ID を確認します。

      confluent ksql app list
      

      出力は以下のようになります。

             Id      |   Name      | Topic Prefix |   Kafka   | Storage |                         Endpoint                          | Status
      +--------------+-------------+--------------+-----------+---------+-----------------------------------------------------------+--------+
        lksqlc-lg0g3 | ksqldb-app1 | pksqlc-4v5nn | lkc-emmox |     500 | https://pksqlc-emko7.us-central1.gcp.confluent.cloud:8088 | UP
      

      ちなみに

      lksqlcpksqlc というプレフィックスはそれぞれ、「logical ksqlDB cluster」と「physical ksqlDB cluster」の頭字語です。

    2. users トピックに対するアクセス権限を ksqlDB に与えます。

      confluent ksql app configure-acls <ksqldb-cluster-id> users --cluster <kafka-cluster-id>
      

      この例のクラスターの場合は、以下のコマンドを使用して、users トピックに対するアクセス権限を ksqlDB に与えます。

      confluent ksql app configure-acls lksqlc-lg0g3 users --cluster lkc-emmox
      

Confluent Cloud CLI で Confluent Cloud ksqlDB の API キーを作成する

Confluent CLI v0.198.0 以降では、Confluent CLI を使用して API キーを作成できるようになりました。詳細については、「リソース別 API キーの作成」を参照してください。

重要

この手順で作成する API キーとシークレットは、Confluent Cloud CLI のインストール時に作成したキーペアとは異なります。このキーペアは ksqlDB アプリケーション専用であり、confluent api-key create --resource <ksqldb-cluster-id> コマンドを使用しないと作成できません。

以下のコマンドを実行して、ksqlDB アプリケーションに関連するすべての API キーのリストを表示します。

confluent api-key list --resource <ksqldb-cluster-id>

出力が空の場合は、「リソース別 API キーの作成」の手順に従ってください。

別のソースからの API キーの保存

UI、API、または別のマシンの CLI を使用して API キーを作成した場合は、まずこの情報を保管しないとシークレットを CLI で使用できません。API キーの作成後にシークレットを取得することができないため、これは必須です。confluent api-key store コマンドを使用して API シークレットを追加します。

Confluent Cloud CLI から以下のコマンドを入力します。

confluent api-key store <api-key> <secret> --resource <cluster-id>

この例の lkc-emmox という ID のクラスターの API キーとシークレットのペアの場合、以下のようなコマンドになります。

confluent api-key store LD35EN2ZJTCTRQRM 67JImN+9vk+Hj3eak2/UcwUlbDNlGGC3KAIOy5JNRVSnweumPBVpW31JWZSBeawz --resource lkc-emmox

API キーを使用して Confluent Cloud 内の ksqlDB アプリケーションにアクセスする

Confluent Cloud 内の ksqlDB は ksqlDB API キーによる認証に対応しています。

ksqlDB CLI を使用する場合
ksqlDB CLI をクラスターに接続するには、Confluent Cloud ksqlDB サーバーの URL を指定して以下のコマンドを実行します。
$CONFLUENT_HOME/bin/ksql -u <ksql-api-key> -p <secret> <ccloud-ksql-server-url>
HTTPS リクエストを使用する場合
リクエストの Accept ヘッダーに --basic を指定します。さらに、ksqlDB API キーとシークレットをコロンで区切って --user 認証情報として送信する必要があります。
  1. confluent ksql app list コマンドを実行して、ksqlDB エンドポイントの URL を取得します。

    confluent ksql app list
    
           Id      |   Name      | Topic Prefix |   Kafka   | Storage |                         Endpoint                         | Status
    +--------------+-------------+--------------+-----------+---------+----------------------------------------------------------+--------+
      lksqlc-lg0g3 | ksqldb-app1 | pksqlc-4v5nn | lkc-emmox |     500 | https://pksqlc-emko7.us-central1.gcp.confluent.cloud:443 | UP
    
  2. curl コマンドを実行して、ksql エンドポイントに POST リクエストを送信します。

    curl -X "POST" "https://<cloud-ksqldb-endpoint>:443/ksql" \
         -H "Accept: application/vnd.ksql.v1+json" \
         --basic --user "<ksqldb-api-key>:<secret>" \
         -d $'{
      "ksql": "LIST STREAMS;",
      "streamsProperties": {}
    }'
    

    前述の ksqlDB アプリケーションの場合のコマンド例は以下のようになります。

    curl -X "POST" "https://pksqlc-4v5nn.us-west2.gcp.confluent.cloud:443/ksql" \
         -H "Accept: application/vnd.ksql.v1+json" \
         --basic --user "7DOQ6QCUWCE6M0HD:25UCfE+p25wPXXHMJcIBg3dANJvr76LG4VZ5Hold+X2dVD5drvm8CdDaVdHia8d7" \
         -d $'{
      "ksql": "LIST STREAMS;",
      "streamsProperties": {}
    }'
    

    出力は以下のようになります。

    [
      {
        "@type": "streams",
        "statementText": "LIST STREAMS;",
        "streams": [
          {
            "type": "STREAM",
            "name": "KSQL_PROCESSING_LOG",
            "topic": "pksqlc-4v5nn-processing-log",
            "format": "JSON"
          },
          {
            "type": "STREAM",
            "name": "PAGEVIEWS_ENRICHED",
            "topic": "pksqlc-4v5nnPAGEVIEWS_ENRICHED",
            "format": "JSON"
          },
          {
            "type": "STREAM",
            "name": "PAGEVIEWS_ORIGINAL",
            "topic": "pageviews",
            "format": "JSON"
          }
        ],
        "warnings": []
      }
    ]
    
  3. 以下のコマンドを実行して、PAGEVIEWS_ENRICHED ストリームのストリーミングデータを調べます。Ctrl キーを押しながら C キーを押して、クエリを停止します。

    curl -X "POST" "https://<cloud-ksqldb-endpoint>:443/query-stream" \
         --basic --user "<ksqldb-api-key>:<secret>" \
          -d $'{
       "sql": "SELECT * FROM PAGEVIEWS_ENRICHED EMIT CHANGES;",
       "streamsProperties": {}
     }'
    

    出力は以下のようになります。

    {"queryId":"d14df2c1-5995-4855-b300-f7710464c81c","columnNames":["USERID","PAGEID","REGIONID","GENDER"],"columnTypes":["STRING","STRING","STRING","STRING"]}
    ["User_7","Page_85","Region_8","FEMALE"]
    ["User_9","Page_47","Region_7","MALE"]
    ["User_4","Page_98","Region_7","OTHER"]
    ^C
    

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png