本稼働環境での Schema Registry の実行

Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.

このトピックでは、クラスターを使用した本稼働環境に移行する前の重要な考慮事項について説明します。ただし、Schema Registry を本稼働環境で実行するための指針が網羅されているわけではありません。

ちなみに

Confluent Platform 5.2.0 以降では、クラスター内の全ノードで同じバージョンの Schema Registry を実行することをお勧めします。Confluent Platform 5.2.0 以降を使用し、同じクラスター内で異なるバージョンの Schema Registry を実行すると、実行時エラーが発生して、新しいスキーマバージョンを作成できなくなります。

ハードウェア

標準的な開発パスを踏襲している方であれば、身近なマシンを寄せ集めた小さなクラスターやノート PC で Schema Registry に触れたことがあるかと思います。しかし、Schema Registry を本稼働環境にデプロイする場合には、考慮すべきいくつかの推奨事項があります。いずれも絶対に厳守すべきルールというわけではありません。

メモリー

Schema Registry は Kafka をコミットログとして使用し、登録されているすべてのスキーマを高い堅牢性で格納したうえで、スキーマのルックアップを高速化するために少量のインメモリーインデックスを保持します。控えめに見積もったとしても、登録される一意のスキーマの数は、LinkedIn のようにデータを重視する大規模な企業の場合でも 10,000 個程度です。ヒープオーバーヘッドが 1 つのスキーマにつき平均して約 1,000 バイトとすれば、ヒープサイズは 1 GB もあれば十分でしょう。

CPU

Schema Registry の CPU 使用率は高くありません。最も高い計算能力を必要とするタスクは、2 つのスキーマ間の互換性をチェックする処理ですが、頻度は低く、主としてサブジェクトに新しいスキーマバージョンを登録する際に発生します。

CPU の速度を高めるか、コア数を増やすかのどちらかを選ぶ必要があるなら、コア数を増やすようにしてください。複数のコアによってもたらされるコンカレンシー上の優位性の方が、クロック速度がわずかに向上するよりも、はるかに有意義です。

ディスク

Schema Registry は、ディスクに常駐するデータを持ちません。現時点では、Kafka をコミットログとして使用してすべてのスキーマを高い堅牢性で格納したうえで、すべてのスキーマのインメモリーインデックスを保持します。そのため、ディスクの用途は、log4j ログの格納に限られます。

ネットワーク

分散システムのパフォーマンスを確保するには、高速で信頼性の高いネットワークが重要となります。レイテンシが小さければ、ノードの通信が容易になります。また、帯域幅が広ければ、シャードの移動や回復が円滑化されます。大半のクラスターは、近年のデータセンターネットワーク(1 GbE、10 GbE)であれば問題ありません。

複数のデータセンターにまたがるクラスターは避けてください。データセンター同士がすぐ近くに置かれている場合でも同様です。距離が地理的に遠く離れているようなクラスターは絶対に避けてください。

分散システムでは、レイテンシが大きくなると問題が深刻化する傾向があり、また、デバッグや解決も難しくなります。

複数のデータセンター間のパイプは堅牢である、またはレイテンシが小さいと思われがちです。しかし一般的に、その考えは正しいとは言えません。どこかの時点でネットワーク障害が発生する可能性はあります。弊社が推奨する「マルチデータセンター構成」を参照してください。

JVM/Java 仮想マシン

最新バージョンの JDK 1.8 と G1 コレクターを実行するようお勧めします(無料で提供されている以前のバージョンには、セキュリティ上の脆弱性が見つかっています)。

現在 JDK 1.7(サポート対象)を使用していて、今後 G1(現在のデフォルト)の使用を検討している場合は、必ず u51 を適用してください。テスト時に u21 を試しましたが、そのバージョンの GC の実装に関して数多くの問題に遭遇しました。

弊社では、次のような GC チューニングを推奨しています。

-Xms1g -Xmx1g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 \
       -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M \
       -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

重要な構成オプション

本稼働環境では、以下の構成に変更を加える必要があります。これらのオプションは、クラスターのレイアウトによって異なります。

マルチクラスターデプロイでは、Kafka ベースのプライマリ選出(後述)を使用するように Schema Registry を構成します。(これらのプロパティの詳しい説明については、「Schema Registry の構成オプション」を参照してください。)

Kafka ベースのプライマリ選出

Kafka ベースのプライマリ選出は、Confluent Platform 4.0 以降で利用できます。リーダー選出には、この方法を使用することをお勧めします。プライマリ選出に Kafka を使用するように Schema Registry を構成するには、kafkastore.bootstrap.servers 設定を構成します。

kafkastore.bootstrap.servers

kafkastore.bootstrap.servers に指定されたブートストラップサーバーを含む Kafka クラスターは、Schema Registry インスタンスを調整(リーダー選出)してスキーマデータを格納する目的に使用されます。Kafka セキュリティが有効になっている場合、Schema Registry が Kafka との接続に使用するセキュリティプロトコルを指定する際にも、kafkastore.bootstrap.servers が使用されます。

リスナー

Schema Registry の ID は ZooKeeper に格納され、ホスト名とポートから成ります。複数のリスナーが構成されている場合は、最初のリスナーのポートがその ID に使用されます。

host.name

ZooKeeper でアドバタイズされるホスト名。Schema Registry を複数のノードで実行する場合は必ずこれを設定してください。

注釈

登録されているすべてのスキーマを格納する内部 _schemas トピックについては、Kafka サーバーの min.insync.replicas を 1 より大きい値に構成してください。たとえば、kafkastore.topic.replication.factor が 3 の場合、kafkastore.topic に対する Kafka サーバーの min.insync.replicas を 2 に設定します。これにより、スキーマの登録書き込みが、3 個あるレプリカのうち 2 個以上でコミットされれば、その書き込みは堅牢であると見なされます。さらに、isr 以外のレプリカが決してリーダーとして選出されないよう(データの損失を招くおそれがあります)、unclean.leader.election.enable は false に設定することをお勧めします。

Schema Registry の構成オプション」に、すべての構成オプション一式が掲載されています。

ZooKeeper ベースのプライマリ選出

重要

ZooKeeper のリーダー選出と kafkastore.connection.url は非推奨となっています。代わりに Kafka のリーダー選出を使用してください。詳細については、「ZooKeeper のプライマリ選出から Kafka のプライマリ選出への移行」を参照してください。

ZooKeeper ベースのプライマリ選出から Kafka ベースのプライマリ選出への移行については、移行 に関する詳細説明を参照してください。

変更を避けるべきストレージ設定

Schema Registry は、kafkastore.topic で定義された Kafka トピックにすべてのスキーマを格納します。この Kafka トピックは、Schema Registry データベースのコミットログとして機能し、また Source of Truth(信頼できる情報源)となっているため、このストアへの書き込みには堅牢性が必要です。Kafka ベースのコミットログ書き込みの堅牢性にかかわるすべての設定には、Schema Registry の出荷時点で、十分に考慮されたデフォルト値が割り当てられています。また、データの損失を防ぐために、kafkastore.topic トピックは圧縮されている必要があります。判断がつかない場合、これらの設定はそのままにしておいてください。トピックを手動で作成する必要がある場合のために、適切な構成の例を次に示しました。

# kafkastore.topic=_schemas
  bin/kafka-topics --create --bootstrap-server localhost:9092 --topic _schemas --replication-factor 3 --partitions 1 --config cleanup.policy=compact

kafkastore.topic

単一パーティションの堅牢なトピック。高い堅牢性でデータを記録するログとしての役割を果たします。保持ポリシーに起因するデータの損失を防ぐために、このトピックは圧縮されている必要があります。

  • 型: string
  • デフォルト: "_schemas"
  • 重要度: 高

kafkastore.topic.replication.factor

スキーマトピックに必要なレプリケーション係数。実際のレプリケーション係数は、この値とライブ Kafka ブローカー数のどちらか小さい方になります。

  • 型: int
  • デフォルト値: 3
  • 重要度: 高

kafkastore.init.timeout.ms

Kafka ストアの初期化(スキーマデータを格納する Kafka トピックの作成を含む)のタイムアウト。

  • 型: int
  • デフォルト: 60000
  • 重要度: 中

ZooKeeper のプライマリ選出から Kafka のプライマリ選出への移行

重要

ZooKeeper のリーダー選出と kafkastore.connection.url は非推奨となっています。代わりに Kafka のリーダー選出を使用してください。このセクションでは、Kafka のリーダー選出を構成する方法について説明します。

ZooKeeper ベースのプライマリ選出から Kafka ベースのプライマリ選出に移行することになった場合は、すべての Schema Registry ノードで、次の構成に変更を加える必要があります。

  • kafkastore.connection.url を削除します。
  • schema.registry.zk.namespace が構成されている場合は、これを削除します。
  • kafkastore.bootstrap.servers を構成します。
  • Schema Registry クラスターが複数ある場合のための schema.registry.zk.namespace が最初からあった場合には、schema.registry.group.id を構成します。

kafkastore.connection.urlkafkastore.bootstrap.servers の両方が構成されている場合、リーダー選出には Kafka が使用されます。

書き込みのダウンタイム

ZooKeeper ベースのプライマリ選出から Kafka ベースのプライマリ選出には、おおよそ以下の手順で移行できます。この手順を実行すると、短時間ですが Schema Registry への書き込みができなくなります。

  • 対象のノードに対して前述の構成変更を行い、さらにすべてのノードで leader.eligibility が false に設定されていることを確認します。
  • すべてのノードのローリング再起動を実行します。
  • プライマリとなりうる各ノードの leader.eligibility を true に構成して、それらをバウンスします。

完全なダウンタイム

作業を簡素化したい場合は、Schema Registry を一時的に停止してから移行を行ってください。つまり、すべてのノードをシャットダウンして、新しい構成で再起動します。

バックアップと復元

Kafka バックエンド」に説明があるように、すべてのスキーマ、サブジェクトとバージョンおよび ID のメタデータ、そして互換性の設定は、特殊な Kafka トピックである <kafkastore.topic> (デフォルトでは _schemas)に対してメッセージとして追加されます。このトピックは、スキーマ ID に使用される共通の Source of Truth(信頼できる情報源)であるため、バックアップしておく必要があります。万一予期しない事態が発生してこのトピックにアクセスできなくなった場合でも、このスキーマトピックをバックアップから復元することで、その後もコンシューマーは Avro フォーマットで送信された Kafka メッセージを読み取ることができます。

ベストプラクティスとして、<kafkastore.topic> をバックアップすることをお勧めします。これには 3 とおりの方法があります。以降でそれらの方法について説明します。

Replicator を使用したバックアップ

複数のデータセンターにわたってデプロイされた Kafka 環境が既にある場合は、Confluent Replicator を使用して、<kafkastore.topic>_schemas)を別の Kafka クラスターにバックアップすることができます。

シンクコネクターを使用したバックアップ

別の方法として、シンクとストレージ間のコネクター(Kafka Connect Amazon S3 Sink Connector など)を使用して、独立したストレージ(AWS S3 など)にデータをバックアップし、ストレージ内のデータを常時最新の状態に保つこともできます。<kafkastore.topic>_schemas)は未加工のデータ(バイト)としてコピーする必要がある点に注意してください。Connect S3 Sink Connector を使用する場合、次の 2 つの方法があります。

  • Kafka レコードと _schemas トピックを S3 に格納するには、ByteArrayConverter を使用して、_schemas トピックを未加工データとして S3 にバックアップします。その後、構成プロパティ topics または topics.regex を使用して Kafka の他のトピックをリストし、S3 にバックアップすることができます。(topics および topics.regex については、「シンクの構成プロパティ」を参照してください。)
  • JSON、AVRO、Parquet のいずれかで Kafka レコードを格納する必要がある場合は、S3 Sink Connector を 1 つ使用して、Kafka の一連のトピックを S3 にバックアップすることができます。そのうえで、S3 Sink Connector をもう 1 つ作成し、Bytes フォーマットを使用して、_schemas トピックを S3 にコピーします。

コマンドラインツールを使用したバックアップ

前述した 2 つの方法の代わりに、Kafka のコマンドラインツールを使用して、トピックの内容を定期的にファイルに保存することもできます。以下の例では、<kafkastore.topic> の値がデフォルト(_schemas)に設定されていることを想定しています。

コマンドラインツールを使用してトピックをバックアップするには、kafka-console-consumer を使用してスキーマ トピックから schemas.log というファイルにメッセージを取り込みます。このファイルを Kafka クラスターとは別の場所に保存してください。

bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic _schemas --from-beginning --property print.key=true --timeout-ms 1000 1> schemas.log

トピックを復元するには、kafka-console-producer を使用して、schemas.log ファイルの内容を新しいスキーマ トピックに書き込みます。この例では、新しいスキーマトピック名(_schemas_restore)を使用しています。新しいトピック名を使用するにせよ、以前のトピック名(_schemas)を使用するにせよ、それに応じて <kafkastore.topic> を確実に設定してください。

bin/kafka-console-producer --broker-list localhost:9092 --topic _schemas_restore --property parse.key=true < schemas.log