REST Proxy: Apache Kafka® のサンプル

このチュートリアルでは、Confluent REST Proxy を使用して Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費します。

チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。

前提条件

クライアント

  • Docker バージョン 17.06.1-ce
  • Docker Compose バージョン 1.25.4
  • wget

Kafka クラスター

このチュートリアルは、Confluent Cloud を使用して取り組むのが最も簡単です。そうすればローカル Kafka クラスターを実行する必要はありません。Confluent Cloud にサインアップするときは、プロモーションコード C50INTEG を適用すると、$50 相当を無料で使用できます(詳細)。Console から LEARN をクリックしてクラスターをプロビジョニングし、Clients をクリックして、クライアントアプリケーションに対して設定するクラスター固有の構成と認証情報を取得します。サポートされている CLIREST API、またはコミュニティでサポートされている Confluent Cloud 向け ccloud-stack ユーティリティ を使用してもかまいません。

Confluent Cloud の使用を希望しない場合は、ローカルホスト や他のリモートサーバーで実行されている Kafka クラスターでこのチュートリアルに取り組むこともできます。

セットアップ

  1. confluentinc/examples GitHub リポジトリのクローンを作成し、6.2.4-post ブランチをチェックアウトします。

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout 6.2.4-post
    
  2. REST Proxy のサンプルのディレクトリに変更します。

    cd clients/cloud/rest-proxy/
    
  3. Kafka クラスターに接続するための構成パラメーターを含むローカルファイル($HOME/.confluent/java.config など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}{{CLUSTER_API_KEY }}、および {{ CLUSTER_API_SECRET }} に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。

    • Confluent Cloud のテンプレート構成ファイル

      # Required connection configs for Kafka producer, consumer, and admin
      bootstrap.servers={{ BROKER_ENDPOINT }}
      security.protocol=SASL_SSL
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
      sasl.mechanism=PLAIN
      # Required for correctness in Apache Kafka clients prior to 2.6
      client.dns.lookup=use_all_dns_ips
      
      # Best practice for Kafka producer to prevent data loss 
      acks=all
      
    • ローカルホストのテンプレート構成ファイル

      # Kafka
      bootstrap.servers=localhost:9092
      
  4. ENV 変数のファイルを生成します。Docker は、これを使用してブートストラップサーバーとセキュリティ構成を設定します。

    ../../../ccloud/ccloud-generate-cp-configs.sh $HOME/.confluent/java.config
    
  5. 生成した ENV 変数のファイルをソースにします。

    source ./delta_configs/env.delta
    
  6. cp-all-in-one-cloud docker-compose.yml ファイルを入手します。このファイルは、ローカルホストのコンテナーで Confluent Platform を実行し、Confluent Cloud に接続するようにコンテナーを自動的に構成します。

    wget -O docker-compose.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.2.4-post/cp-all-in-one-cloud/docker-compose.yml
    
  7. REST Proxy 構成全体については、前の手順でダウンロードした docker-compose.yml ファイルの REST Proxy セクションを参照してください。

    cat docker-compose.yml
    

基本プロデューサーおよびコンシューマー

このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice など)を表すキーと、json フォーマットのカウント値({"count": 0} など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。

レコードの生成

  1. このセクションでは Schema Registry を使用しないため、docker-compose.yml ファイルの以下の行はコメントアウトしてください。

    #KAFKA_REST_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
    #KAFKA_REST_CLIENT_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
    #KAFKA_REST_CLIENT_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
    
  2. 次のコマンドを実行して REST Proxy Docker コンテナーを起動します。

    docker-compose up -d rest-proxy
    
  3. REST Proxy のログを表示し、サーバーが起動されたことを確認するログメッセージ Server started, listening for requests が表示されるまで待ちます。

    docker-compose logs -f rest-proxy
    
  4. REST Proxy が接続されている Kafka のクラスター ID を入手します。

    KAFKA_CLUSTER_ID=$(docker-compose exec rest-proxy curl -X GET \
         "http://localhost:8082/v3/clusters/" | jq -r ".data[0].cluster_id")
    

    パラメーター KAFKA_CLUSTER_ID の値が有効であることを確認します。このチュートリアルのサンプルの場合は、lkc-56ngz と表示されますが、実際の出力では異なります。

    echo $KAFKA_CLUSTER_ID
    
  5. REST Proxy API v3 の AdminClient 機能を使用して Kafka トピック test1 を作成します。

    docker-compose exec rest-proxy curl -X POST \
         -H "Content-Type: application/json" \
         -d "{\"topic_name\":\"test1\",\"partitions_count\":6,\"configs\":[]}" \
         "http://localhost:8082/v3/clusters/${KAFKA_CLUSTER_ID}/topics" | jq .
    

    以下のように出力されることを確認します。

    {
      "kind": "KafkaTopic",
      "metadata": {
        "self": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test1",
        "resource_name": "crn:///kafka=lkc-56ngz/topic=test1"
      },
      "cluster_id": "lkc-56ngz",
      "topic_name": "test2",
      "is_internal": false,
      "replication_factor": 0,
      "partitions": {
        "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/partitions"
      },
      "configs": {
        "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/configs"
      },
      "partition_reassignments": {
        "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test1/partitions/-/reassignment"
      }
    }
    
  6. キー alice、および値 {"count":0}{"count":1}{"count":2} で、3 つの JSON メッセージをトピックに生成します。

    docker-compose exec rest-proxy curl -X POST \
         -H "Content-Type: application/vnd.kafka.json.v2+json" \
         -H "Accept: application/vnd.kafka.v2+json" \
         --data '{"records":[{"key":"alice","value":{"count":0}},{"key":"alice","value":{"count":1}},{"key":"alice","value":{"count":2}}]}' \
         "http://localhost:8082/topics/test1" | jq .
    

    以下のように出力されることを確認します。

    {
      "offsets": [
        {
          "partition": 0,
          "offset": 0,
          "error_code": null,
          "error": null
        },
        {
          "partition": 0,
          "offset": 1,
          "error_code": null,
          "error": null
        },
        {
          "partition": 0,
          "offset": 2,
          "error_code": null,
          "error": null
        }
      ],
      "key_schema_id": null,
      "value_schema_id": null
    }
    
  7. プロデューサーコード を表示します。

レコードの消費

  1. コンシューマーグループ cg1 に属するコンシューマー ci1 を作成します。トピックの先頭で開始されるように、auto.offset.resetearliest を指定します。

    docker-compose exec rest-proxy curl -X POST \
         -H "Content-Type: application/vnd.kafka.v2+json" \
         --data '{"name": "ci1", "format": "json", "auto.offset.reset": "earliest"}' \
         http://localhost:8082/consumers/cg1 | jq .
    

    以下のように出力されることを確認します。

    {
      "instance_id": "ci1",
      "base_uri": "http://rest-proxy:8082/consumers/cg1/instances/ci1"
    }
    
  2. コンシューマーをトピック test1 にサブスクライブします。

    docker-compose exec rest-proxy curl -X POST \
         -H "Content-Type: application/vnd.kafka.v2+json" \
         --data '{"topics":["test1"]}' \
         http://localhost:8082/consumers/cg1/instances/ci1/subscription | jq .
    
  3. 最初の応答にあるベース URL を使用してデータを消費します。間に 10 秒間のスリープを挟んで curl コマンドを 2 回発行します。これは、https://github.com/confluentinc/kafka-rest/issues/432 に記載された理由による、意図的な操作です。

    docker-compose exec rest-proxy curl -X GET \
         -H "Accept: application/vnd.kafka.json.v2+json" \
         http://localhost:8082/consumers/cg1/instances/ci1/records | jq .
    
    sleep 10
    
    docker-compose exec rest-proxy curl -X GET \
         -H "Accept: application/vnd.kafka.json.v2+json" \
         http://localhost:8082/consumers/cg1/instances/ci1/records | jq .
    

    以下のように出力されることを確認します。

    []
    [
      {
        "topic": "test1",
        "key": "alice",
        "value": {
          "count": 0
        },
        "partition": 0,
        "offset": 0
      },
      {
        "topic": "test1",
        "key": "alice",
        "value": {
          "count": 1
        },
        "partition": 0,
        "offset": 1
      },
      {
        "topic": "test1",
        "key": "alice",
        "value": {
          "count": 2
        },
        "partition": 0,
        "offset": 2
      }
    ]
    
  4. コンシューマーインスタンスを削除して、そのリソースをクリーンアップします。

    docker-compose exec rest-proxy curl -X DELETE \
         -H "Content-Type: application/vnd.kafka.v2+json" \
         http://localhost:8082/consumers/cg1/instances/ci1 | jq .
    
  5. コンシューマーコード を表示します。

REST Proxy の停止

  1. 次のコマンドを実行して Docker を停止します。

    docker-compose down
    

Avro と Confluent Cloud Schema Registry

このサンプルは、前のサンプルと似ていますが、値は Avro フォーマットです。また、Confluent Cloud Schema Registry と統合されています。Confluent Cloud Schema Registry を使用する前に、その 可用性と制限 を確認してください。

  1. Confluent Cloud Console の「Confluent Cloud におけるスキーマ管理のクイックスタート」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。

  2. ご使用の VPC が Confluent Cloud Schema Registry 公衆インターネットエンドポイントに接続できることを確認します。

  3. Schema Registry に接続するための構成パラメーターを含むローカル構成ファイル($HOME/.confluent/java.config など)をアップデートします。

    • Confluent Cloud のテンプレート構成ファイル

      # Required connection configs for Kafka producer, consumer, and admin
      bootstrap.servers={{ BROKER_ENDPOINT }}
      security.protocol=SASL_SSL
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
      sasl.mechanism=PLAIN
      # Required for correctness in Apache Kafka clients prior to 2.6
      client.dns.lookup=use_all_dns_ips
      
      # Best practice for Kafka producer to prevent data loss 
      acks=all
      
      # Required connection configs for Confluent Cloud Schema Registry
      schema.registry.url=https://{{ SR_ENDPOINT }}
      basic.auth.credentials.source=USER_INFO
      basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
      
    • ローカルホストのテンプレート構成ファイル

      # Kafka
      bootstrap.servers=localhost:9092
      
      # Confluent Schema Registry
      schema.registry.url=http://localhost:8081
      
  4. Schema Registry サブジェクトのリストを表示して Confluent Cloud Schema Registry の資格情報を確認します。次の例の {{ SR_API_KEY }}{{ SR_API_SECRET }}、および {{ SR_ENDPOINT }} に値を代入します。

    curl -u {{ SR_API_KEY }}:{{ SR_API_SECRET }} https://{{ SR_ENDPOINT }}/subjects
    
  5. ENV 変数のファイルを再生成します。Docker は、これを使用してブートストラップサーバーとセキュリティ構成を設定します。

    ../../../ccloud/ccloud-generate-cp-configs.sh $HOME/.confluent/java.config
    
  6. source コマンドで、再生成した ENV 変数のファイルの内容を反映させます。

    source ./delta_configs/env.delta
    

Avro レコードの生成

  1. このセクションでは Schema Registry を使用するため、docker-compose.yml ファイルの以下の行のコメントを解除してください。

    KAFKA_REST_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL
    KAFKA_REST_CLIENT_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE
    KAFKA_REST_CLIENT_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
    
  2. 次のコマンドを実行して REST Proxy Docker コンテナーを起動します。

    docker-compose up -d rest-proxy
    
  3. REST Proxy のログを表示し、サーバーが起動されたことを確認するログメッセージ Server started, listening for requests が表示されるまで待ちます。

    docker-compose logs -f rest-proxy
    
  4. REST Proxy が接続されている Kafka のクラスター ID を入手します。

    KAFKA_CLUSTER_ID=$(docker-compose exec rest-proxy curl -X GET \
         "http://localhost:8082/v3/clusters/" | jq -r ".data[0].cluster_id")
    

    パラメーター KAFKA_CLUSTER_ID の値が有効であることを確認します。このチュートリアルのサンプルの場合は、lkc-56ngz と表示されますが、実際の出力では異なります。

  5. REST Proxy API v3 の AdminClient 機能を使用して Kafka トピック test2 を作成します。

    docker-compose exec rest-proxy curl -X POST \
         -H "Content-Type: application/json" \
         -d "{\"topic_name\":\"test2\",\"partitions_count\":6,\"configs\":[]}" \
         "http://localhost:8082/v3/clusters/${KAFKA_CLUSTER_ID}/topics" | jq .
    

    以下のように出力されることを確認します。

    {
      "kind": "KafkaTopic",
      "metadata": {
        "self": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2",
        "resource_name": "crn:///kafka=lkc-56ngz/topic=test2"
      },
      "cluster_id": "lkc-56ngz",
      "topic_name": "test2",
      "is_internal": false,
      "replication_factor": 0,
      "partitions": {
        "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/partitions"
      },
      "configs": {
        "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/configs"
      },
      "partition_reassignments": {
        "related": "http://rest-proxy:8082/v3/clusters/lkc-56ngz/topics/test2/partitions/-/reassignment"
      }
    }
    
  6. トピック test2 の新しい Avro スキーマを Confluent Cloud Schema Registry に登録します。

    docker-compose exec rest-proxy curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{ "schema": "[ { \"type\":\"record\", \"name\":\"countInfo\", \"fields\": [ {\"name\":\"count\",\"type\":\"long\"}]} ]" }' -u "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO" "$SCHEMA_REGISTRY_URL/subjects/test2-value/versions"
    
    

    新しいスキーマ ID が出力に表示されることを確認します。

    {"id":100001}
    
  7. 変数 schemaid をスキーマ ID の値に設定します。

    schemaid=$(docker-compose exec rest-proxy curl -X GET -u "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO" "$SCHEMA_REGISTRY_URL/subjects/test2-value/versions/latest" | jq '.id')
    
  8. {"count":0}{"count":1}{"count":2} で、3 つの Avro メッセージをトピックに生成します。リクエストの本体にスキーマ ID が含まれることに注意してください。

    docker-compose exec rest-proxy curl -X POST \
         -H "Content-Type: application/vnd.kafka.avro.v2+json" \
         -H "Accept: application/vnd.kafka.v2+json" \
         --data '{"value_schema_id": '"$schemaid"', "records": [{"value": {"countInfo":{"count": 0}}},{"value": {"countInfo":{"count": 1}}},{"value": {"countInfo":{"count": 2}}}]}' \
         "http://localhost:8082/topics/test2" | jq .
    

    以下のように出力されることを確認します。

    {
      "offsets": [
        {
          "partition": 4,
          "offset": 0,
          "error_code": null,
          "error": null
        },
        {
          "partition": 4,
          "offset": 1,
          "error_code": null,
          "error": null
        },
        {
          "partition": 4,
          "offset": 2,
          "error_code": null,
          "error": null
        }
      ],
      "key_schema_id": null,
      "value_schema_id": 100001
    }
    
  9. プロデューサー Avro コード を表示します。

Avro レコードの消費

  1. コンシューマーグループ cg2 に属するコンシューマー ci2 を作成します。トピックの先頭で開始されるように、auto.offset.resetearliest を指定します。

    docker-compose exec rest-proxy curl -X POST \
         -H "Content-Type: application/vnd.kafka.v2+json" \
         --data '{"name": "ci2", "format": "avro", "auto.offset.reset": "earliest"}' \
         http://localhost:8082/consumers/cg2 | jq .
    

    以下のように出力されることを確認します。

    {
      "instance_id": "ci2",
      "base_uri": "http://rest-proxy:8082/consumers/cg2/instances/ci2"
    }
    
  2. コンシューマーをトピック test2 にサブスクライブします。

    docker-compose exec rest-proxy curl -X POST \
         -H "Content-Type: application/vnd.kafka.v2+json" \
         --data '{"topics":["test2"]}' \
         http://localhost:8082/consumers/cg2/instances/ci2/subscription | jq .
    
  3. 最初の応答にあるベース URL を使用してデータを消費します。間に 10 秒間のスリープを挟んで curl コマンドを 2 回発行します。これは、https://github.com/confluentinc/kafka-rest/issues/432 に記載された理由による、意図的な操作です。

    docker-compose exec rest-proxy curl -X GET \
         -H "Accept: application/vnd.kafka.avro.v2+json" \
         http://localhost:8082/consumers/cg2/instances/ci2/records | jq .
    
    sleep 10
    
    docker-compose exec rest-proxy curl -X GET \
         -H "Accept: application/vnd.kafka.avro.v2+json" \
         http://localhost:8082/consumers/cg2/instances/ci2/records | jq .
    

    以下のように出力されることを確認します。

    []
    [
      {
        "topic": "test2",
        "key": null,
        "value": {
          "count": 0
        },
        "partition": 0,
        "offset": 0
      },
      {
        "topic": "test2",
        "key": null,
        "value": {
          "count": 1
        },
        "partition": 0,
        "offset": 1
      },
      {
        "topic": "test2",
        "key": null,
        "value": {
          "count": 2
        },
        "partition": 0,
        "offset": 2
      }
    ]
    
  4. コンシューマーインスタンスを削除して、そのリソースをクリーンアップします。

    docker-compose exec rest-proxy curl -X DELETE \
         -H "Content-Type: application/vnd.kafka.v2+json" \
         http://localhost:8082/consumers/cg2/instances/ci2 | jq .
    
  5. コンシューマー Avro コード を表示します。

Confluent Cloud Schema Registry

  1. Confluent Cloud Schema Registry に登録されたスキーマサブジェクトを表示します。次の出力の <SR API KEY><SR API SECRET>、および <SR ENDPOINT> に値を代入します。

    curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects
    
  2. サブジェクト test2-value が存在することを確認します。

    ["test2-value"]
    
  3. サブジェクト test2-value のスキーマ情報を表示します。次の出力の <SR API KEY><SR API SECRET>、および <SR ENDPOINT> に値を代入します。

    curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects/test2-value/versions/1
    
  4. サブジェクト test2-value のスキーマ情報を確認します。

    {"subject":"test2-value","version":1,"id":100001,"schema":"[{\"type\":\"record\",\"name\":\"countInfo\",\"fields\":[{\"name\":\"count\",\"type\":\"long\"}]}]"}
    

REST Proxy の停止

  1. 次のコマンドを実行して Docker を停止します。

    docker-compose down