オンプレミス Kafka からクラウドへの移行

この Confluent Cloud サンプルでは、ハイブリッド Kafka クラスターを紹介します。一方のクラスターは、ローカルで動作する自己管理型 Kafka クラスターで、他方は Confluent Cloud クラスターです。このユースケースは、オンプレミスからクラウドへの移行における "クラウドへのブリッジ" となります。

イメージ

概要

このサンプルの主なコンポーネントは、次のとおりです。

  • 2 つの Kafka クラスター。一方のクラスターは、ローカルで動作する自己管理型 Kafka クラスターで、他方は Confluent Cloud クラスターです。
  • Confluent Control Center: デプロイを管理し、モニタリングします。これを使用して、トピックの検査、スキーマの表示、ksqlDB クエリの表示と作成、ストリームのモニタリングなどを行います。
  • ksqlDB: Confluent Cloud で、入力トピックの "users" および "pageviews" に対するクエリを実行する Confluent Cloud ksqlDB。
  • 2 つの Kafka Connect クラスター: 一方のクラスターは、ローカルの自己管理型クラスターに接続し、他方は Confluent Cloud クラスターに接続します。両方の Connect ワーカープロセス自体はローカルで実行されます。
    • "kafka-connect-datagen" インスタンス 1 つ: 模擬データを生成してトピック "pageview"s をローカルで事前に取り込むソースコネクター
    • "kafka-connect-datagen" インスタンス 1 つ: 模擬データを生成して Confluent Cloud クラスターでトピック "users" を事前に取り込むソースコネクター
    • Confluent Replicator: ローカルクラスターから Confluent Cloud クラスターにトピック "pageviews" をコピーします。
  • Confluent Schema Registry: このサンプルは、Confluent Cloud Schema Registry を使って実行され、Kafka データは Avro フォーマットで書き込まれます。

注釈

これはサンプル環境であり、多くのサービスが 1 台のホストで実行されています。このサンプルは本稼働環境で実行しないでください。また、Confluent CLI は本稼働環境で使用しないでください。このサンプルは、Confluent Platform と Confluent Cloud のデモを容易に行うためにのみ用意されています。

前提条件

  1. 初期化された Confluent Cloud クラスター
  2. ローカルでインストールされた Confluent Cloud CLI (v1.7.0 以降)
  3. ローカルインストールを使用している場合は、Confluent Platform を ダウンロード (Docker には必要ではありません)
  4. Confluent Platform は、さまざまなオペレーティングシステムおよびソフトウェアバージョンでサポートされています(詳細については「サポートされているバージョンおよび相互運用性」を参照)。このサンプルは、以下に説明する特定の構成で検証されています。この例の Windows での実行は正式にサポートされていませんが、GitHub のサンプルコードを変更して symlink .envconfig.env の内容で置き換えると、Windows でも動作する可能性があります。
    • macOS 10.15.3
    • Confluent Platform 6.0.6
    • Java 11.0.6 2020-01-14 LTS
    • bash バージョン 3.2.57
    • jq 1.6
    • (Docker ベースのサンプル)Docker バージョン 19.03.8
    • (Docker ベースのサンプル)Docker Compose docker-compose バージョン 1.25.4

チュートリアルの実行コスト

注意

Confluent Cloud のすべてのサンプルでは、課金される可能性のある実際の Confluent Cloud リソースを使用しています。サンプルで、新しい Confluent Cloud 環境、Kafka クラスター、トピック、ACL、サービスアカウントに加えて、コネクターや ksqlDB アプリケーションのように時間で課金されるリソースを作成する場合があります。想定外の課金を避けるために、慎重に リソースのコストを確認 してから開始してください。Confluent Cloud のサンプルの実行を終了したら、サービスへの時間単位の課金を回避するためにすべての Confluent Cloud リソースを破棄し、リソースが削除されたことを確認します。

Confluent Cloud のプロモーションコード

Confluent Cloud UI の Billing & payment セクションでプロモーションコード C50INTEG を入力すると、Confluent Cloud で $50 相当を無料で使用できます(詳細)。このプロモーションコードで、この Confluent Cloud サンプルの 1 日分の実行費用が補填されます。これを超えてサービスを利用すると、このサンプルで作成した Confluent Cloud リソースを破棄するまで、時間単位で課金されることがあります。

サンプルの起動

セットアップ

  1. このサンプルでは、サンプルの実行に必要なリソースを備えた、新しい Confluent Cloud 環境を作成します。既に説明したように、このサンプルでは実際の Confluent Cloud リソースを使用するため、課金が発生する可能性があります。

  2. confluentinc/examples GitHub リポジトリのクローンを作成し、6.0.6-post ブランチをチェックアウトします。

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout 6.0.6-post
    
  3. Confluent Cloud サンプルのディレクトリに変更します。

    cd ccloud
    

Run

  1. コマンド ccloud login で、Confluent Cloud のユーザー名とパスワードを使用して Confluent Cloud にログインします。--save 引数により、Confluent Cloud ユーザーログイン資格情報が保存されるか、ローカルの netrc ファイルに対してトークン(SSO の場合)が更新されます。

    ccloud login --save
    
  2. 1 つのコマンドを実行してサンプル全体を起動します。Docker Compose と Confluent Platform ローカルインストールのどちらを使用するかを選択できます。Confluent Cloud に新しいリソースが作成されます。完了するまでに数分間かかります。

    # For Docker Compose
    ./start-docker.sh
    
    # For Confluent Platform local
    ./start.sh
    
  3. このスクリプトを実行すると、フルマネージド型リソースの新規 Confluent Cloud スタックが作成され、すべての接続情報、クラスター ID、資格情報を含むローカル構成ファイルが生成されます。これは、他のデモや自動化の際に便利です。このローカル構成ファイルを表示すると、SERVICE ACCOUNT ID がスクリプトによって自動生成されています。

    cat stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
    

    出力は以下のようになります。

    # ------------------------------
    # Confluent Cloud connection information for demo purposes only
    # Do not use in production
    # ------------------------------
    # ENVIRONMENT ID: <ENVIRONMENT ID>
    # SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID>
    # KAFKA CLUSTER ID: <KAFKA CLUSTER ID>
    # SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID>
    # KSQLDB APP ID: <KSQLDB APP ID>
    # ------------------------------
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    bootstrap.servers=<BROKER ENDPOINT>
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<API KEY>' password='<API SECRET>';
    basic.auth.credentials.source=USER_INFO
    schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>
    schema.registry.url=https://<SR ENDPOINT>
    ksql.endpoint=<KSQLDB ENDPOINT>
    ksql.basic.auth.user.info=<KSQLDB API KEY>:<KSQLDB API SECRET>
    
  4. http://confluent.cloud で Confluent Cloud UI にログインします。

  5. Google Chrome を使用して、http://localhost:9021 の Confluent Control Center GUI に移動します。

プレイブック

Confluent Cloud CLI

  1. クラスターでトピックのリストを表示できることを確認します。

    ccloud kafka topic list
    
  2. 最初にこのサンプル用に作成したサービスアカウント <SERVICE ACCOUNT ID> に関連付けられた ACL を表示します。リソース名は、それぞれのクラスター、Kafka のトピック名、またはコンシューマーグループ名に対応しています。注意: 本稼働環境では、ワイルドカード * は使用しません。これは、デモを目的として使用されています。

    ccloud kafka acl list --service-account <SERVICE ACCOUNT ID>
    

    たとえば、サービスアカウント ID が 69995 の場合の出力は、次のようになります。

      ServiceAccountId | Permission |    Operation     | Resource |         Name          |   Type
    +------------------+------------+------------------+----------+-----------------------+----------+
      User:69995       | ALLOW      | WRITE            | TOPIC    | _confluent-monitoring | PREFIXED
      User:69995       | ALLOW      | READ             | TOPIC    | _confluent-monitoring | PREFIXED
      User:69995       | ALLOW      | READ             | TOPIC    | _confluent-command    | PREFIXED
      User:69995       | ALLOW      | WRITE            | TOPIC    | _confluent-command    | PREFIXED
      User:69995       | ALLOW      | READ             | TOPIC    | _confluent            | PREFIXED
      User:69995       | ALLOW      | CREATE           | TOPIC    | _confluent            | PREFIXED
      User:69995       | ALLOW      | WRITE            | TOPIC    | _confluent            | PREFIXED
      User:69995       | ALLOW      | CREATE           | GROUP    | *                     | LITERAL
      User:69995       | ALLOW      | WRITE            | GROUP    | *                     | LITERAL
      User:69995       | ALLOW      | READ             | GROUP    | *                     | LITERAL
      User:69995       | ALLOW      | WRITE            | TOPIC    | connect-demo-statuses | PREFIXED
      User:69995       | ALLOW      | READ             | TOPIC    | connect-demo-statuses | PREFIXED
      User:69995       | ALLOW      | READ             | TOPIC    | connect-demo-offsets  | PREFIXED
      User:69995       | ALLOW      | WRITE            | TOPIC    | connect-demo-offsets  | PREFIXED
      User:69995       | ALLOW      | DESCRIBE         | TOPIC    | pageviews             | LITERAL
      User:69995       | ALLOW      | DESCRIBE_CONFIGS | TOPIC    | pageviews             | LITERAL
      User:69995       | ALLOW      | CREATE           | TOPIC    | pageviews             | LITERAL
      User:69995       | ALLOW      | ALTER_CONFIGS    | TOPIC    | pageviews             | LITERAL
      User:69995       | ALLOW      | READ             | TOPIC    | pageviews             | LITERAL
      User:69995       | ALLOW      | WRITE            | TOPIC    | pageviews             | LITERAL
      User:69995       | ALLOW      | WRITE            | TOPIC    | users                 | LITERAL
      User:69995       | ALLOW      | WRITE            | TOPIC    | *                     | LITERAL
      User:69995       | ALLOW      | CREATE           | TOPIC    | *                     | LITERAL
      User:69995       | ALLOW      | READ             | TOPIC    | *                     | LITERAL
      User:69995       | ALLOW      | DESCRIBE         | TOPIC    | *                     | LITERAL
      User:69995       | ALLOW      | DESCRIBE_CONFIGS | TOPIC    | *                     | LITERAL
      User:69995       | ALLOW      | READ             | GROUP    | connect-cloud         | LITERAL
      User:69995       | ALLOW      | DESCRIBE         | CLUSTER  | kafka-cluster         | LITERAL
      User:69995       | ALLOW      | CREATE           | CLUSTER  | kafka-cluster         | LITERAL
      User:69995       | ALLOW      | READ             | GROUP    | connect-replicator    | LITERAL
      User:69995       | ALLOW      | WRITE            | TOPIC    | connect-demo-configs  | PREFIXED
      User:69995       | ALLOW      | READ             | TOPIC    | connect-demo-configs  | PREFIXED
      User:69995       | ALLOW      | WRITE            | GROUP    | _confluent            | PREFIXED
      User:69995       | ALLOW      | READ             | GROUP    | _confluent            | PREFIXED
      User:69995       | ALLOW      | CREATE           | GROUP    | _confluent            | PREFIXED
    

kafka-connect-datagen

  1. サンプルで、このコード を確認してください。このコードにより、Kafka のトピック pageviews 用の kafka-connect-datagen コネクターが connect-local クラスターに自動的に読み込まれます。このクラスターは、後で Replicator によって Confluent Cloud にレプリケートされます(Replicator については後で詳しく説明します)。

    {
      "name": "datagen-pageviews",
      "config": {
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "kafka.topic": "pageviews",
        "quickstart": "pageviews",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.basic.auth.credentials.source": "$BASIC_AUTH_CREDENTIALS_SOURCE",
        "value.converter.schema.registry.basic.auth.user.info": "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO",
        "value.converter.schema.registry.url": "$SCHEMA_REGISTRY_URL",
        "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor",
        "max.interval": 100,
        "iterations": 1000000000,
        "tasks.max": "1"
      }
    }
    
  2. Confluent Control Center で、ローカルクラスターの pageviews トピック内のデータを表示します。

    イメージ
  3. サンプルで、このコード を確認してください。このコードにより、Kafka のトピック userskafka-connect-datagen コネクターが connect-cloud クラスターに自動的に読み込まれます。

    {
      "name": "datagen-users",
      "config": {
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "kafka.topic": "users",
        "quickstart": "users",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.basic.auth.credentials.source": "$BASIC_AUTH_CREDENTIALS_SOURCE",
        "value.converter.schema.registry.basic.auth.user.info": "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO",
        "value.converter.schema.registry.url": "$SCHEMA_REGISTRY_URL",
        "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor",
        "max.interval": 1000,
        "iterations": 1000000000,
        "tasks.max": "1"
      }
    }
    
  4. Confluent Control Center で、Confluent Cloud の users トピック内のデータを表示します。

    イメージ

ksqlDB

  1. サンプルでは、適切な認証情報を使用して、このコード で REST API を使用して、:devx-examples:`statements.sql|ccloud/statements.sql`(ksqlDB バージョン 0.10.0 の場合)から Confluent Cloud ksqlDB クエリが作成されます。

    done
    
    # Submit KSQL queries
    echo -e "\nSubmit KSQL queries\n"
    properties='"ksql.streams.auto.offset.reset":"earliest","ksql.streams.cache.max.bytes.buffering":"0"'
    while read ksqlCmd; do
      echo -e "\n$ksqlCmd\n"
      response=$(curl -X POST $KSQLDB_ENDPOINT/ksql \
           -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
           -u $KSQLDB_BASIC_AUTH_USER_INFO \
           --silent \
           -d @<(cat <<EOF
    {
      "ksql": "$ksqlCmd",
      "streamsProperties": {$properties}
    }
    EOF
    ))
      echo $response
      if [[ ! "$response" =~ "SUCCESS" ]]; then
        echo -e "\nERROR: KSQL command '$ksqlCmd' did not include \"SUCCESS\" in the response.  Please troubleshoot."
        exit 1
    
  2. Confluent Cloud UI から ksqlDB アプリケーションフローを表示します。

    イメージ
  3. いずれかのストリームをクリックして、そのメッセージとスキーマを表示します。

    イメージ

Confluent Replicator

Confluent Replicator は、ソース Kafka クラスターからデスティネーション Kafka クラスターにデータをコピーします。このサンプルでは、送信元クラスターは、ローカルインストールの自己管理型クラスターで、送信先クラスターは Confluent Cloud です。Replicator は、Kafka のトピック pageviews をローカルインストールから Confluent Cloud にレプリケートしています。また、Confluent Monitoring インターセプターとともに動作して Confluent Control Center ストリームをモニタリングしています。

  1. サンプルで、このコード を確認してください。このコードにより、Replicator コネクターが connect-cloud クラスターに自動的に読み込まれます。Replicator 構成では confluent.topic.replication.factor=3 が設定されていることに注意してください。これは、送信元クラスターでは replication.factor=1 であり、Confluent Cloud では replication.factor=3 が求めれらるため、必須の設定です。

    {
      "name": "replicator",
      "config": {
        "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
        "topic.whitelist": "pageviews",
        "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
        "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
        "dest.topic.replication.factor": 3,
        "dest.kafka.bootstrap.servers": "$BOOTSTRAP_SERVERS",
        "dest.kafka.security.protocol": "SASL_SSL",
        "dest.kafka.sasl.mechanism": "PLAIN",
        "dest.kafka.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG",
        "confluent.topic.replication.factor": 3,
        "src.kafka.bootstrap.servers": "kafka:29092",
        "src.consumer.group.id": "connect-replicator",
        "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
        "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "$BOOTSTRAP_SERVERS",
        "src.consumer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL",
        "src.consumer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN",
        "src.consumer.confluent.monitoring.interceptor.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG",
        "src.kafka.timestamps.topic.replication.factor": 1,
        "src.kafka.timestamps.producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor",
        "src.kafka.timestamps.producer.confluent.monitoring.interceptor.bootstrap.servers": "$BOOTSTRAP_SERVERS",
        "src.kafka.timestamps.producer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL",
        "src.kafka.timestamps.producer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN",
        "src.kafka.timestamps.producer.confluent.monitoring.interceptor.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG",
        "tasks.max": "1"
      }
    }
    
  2. Confluent Control Center は、ローカルで実行され、ポート 8087 で動作している、connect-cloud という Connect クラスターを管理するよう構成されています。このクラスターは、kafka-connect-datagen (Kafka のトピック users の)コネクターおよび Replicator コネクターを実行しています。Confluent Control Center UI で、Connect クラスターを表示します。

    イメージ
  3. replicator をクリックして Replicator 構成を表示します。ローカル Kafka クラスターのトピック pageviews が Confluent Cloud にレプリケートされていることがわかります。

    イメージ
  4. ローカルの pageviews トピックから Confluent Cloud の pageviews トピックにメッセージがレプリケートされることを確認します。Confluent Cloud UI で、このトピックのメッセージを表示します。

    イメージ
  5. Confluent Cloud UI で、Replicator のコンシューマーラグを表示します。Consumers ビューで connect-replicator をクリックします。出力は以下のようになります。

    イメージ

Confluent Schema Registry

この例で使用するコネクターは、Confluent Cloud Schema Registry を利用して、Avro フォーマットのデータを自動的に書き込むように構成されています。

  1. すべての Schema Registry サブジェクトを表示します。

    # Confluent Cloud Schema Registry
    curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects
    
  2. Confluent Cloud UI で、pageviews トピックのスキーマを表示します。トピック値は、Schema Registry で登録されたスキーマを使用しています(トピックキーは単なる文字列です)。

    イメージ
  3. オンプレミスの Schema Registry から Confluent Cloud Schema Registry にスキーマを移行する必要がある場合は、この 手順を追ったガイド に従ってください。スキーマの移行に実際に使用できる Replicator 構成の例については、ファイル submit_replicator_schema_migration_config.sh を参照してください。

Confluent Cloud の構成

  1. Confluent Cloud に接続するために、Confluent Platform コンポーネントおよびクライアント用のテンプレートデルタ構成を表示します。

    ls template_delta_configs/
    
  2. Confluent Cloud 構成ファイルから自動的に派生する、コンポーネントごとのデルタ構成パラメーターを生成します。

    ./ccloud-generate-cp-configs.sh
    
  3. このサンプルを start-docker.sh として実行した場合は、docker-compose.yml ファイル にあるすべての Confluent Platform コンポーネントの構成を使用できます。

    # For Docker Compose
    cat docker-compose.yml
    
  4. このサンプルを、Confluent CLI を使用する "start.s"h として実行した場合は、現在の Confluent CLI 一時ディレクトリにある、それぞれのコンポーネントサブフォルダーにすべての構成ファイルとログファイルが保存されます(アクティブに実行されているサンプルが必要です)。

    # For Confluent Platform local install using Confluent CLI
    ls `confluent local current | tail -1`
    

サンプルのトラブルシューティング

  1. Docker を使用して実行した場合は、docker-compose logs | grep ERROR を実行します。

  2. ログファイルを表示するには、現在の Confluent CLI 一時ディレクトリを調べます(アクティブに動作しているサンプルが必要)。

    # View all files
    ls `confluent local current | tail -1`
    
    # View log file per service, e.g. for the Kafka broker
    confluent local services kafka log
    

リソースの破棄

Confluent Cloud のすべてのサンプルでは、実際の Confluent Cloud リソースを使用しています。Confluent Cloud のサンプルの実行を終了したら、予定外の課金を回避するために、すべての Confluent Cloud リソースが破棄されていることを直接確認してください。

  1. サンプルを停止し、Confluent Cloud 内のすべてのリソースとローカルコンポーネントを破棄します。スクリプトの引数として、ローカル構成ファイルにパスを渡し、<SERVICE ACCOUNT ID> に、デモの開始時に自動生成された ID を代入します。

    # For Docker Compose
    ./stop-docker.sh stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
    
    # For Confluent Platform local install using Confluent CLI
    ./stop.sh stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
    
  2. Confluent Cloud 内のリソースが破棄されたことを必ず確認してください。

その他のリソース