Confluent Cloud ksqlDB アプリケーションの移行

新しい機能や改善による利点が利用可能になった時点ですぐに提供するために、Confluent Cloud ksqlDB では新バージョンが頻繁にデプロイされています。これらのデプロイのほとんどはユーザーには不可視であり、ユーザー側でのアクションは不要です。

後方非互換性は、シリアル化フォーマット、データモデル、SQL 構文の変更など、さまざまな形で実現されています。ksqlDB の以前のバージョンで使用していたアプリケーションを引き続き実行するためには、場合により、本稼働環境で使用可能なバージョンによりサポートされているアプリケーションへワークロードを移行する必要があります。このドキュメントでは、簡単に移行するための処理について説明しています。

注釈

後方互換性のないリリースを使用するように ksqlDB アプリケーションが自動的にアップグレードされることはありません。

アプリケーションのスキーマの移植

Confluent Cloud Console インターフェイスまたは ksqlDB CLI を使用することにより、以下のコマンドを実行できます。ksqlDB CLI を使用する場合、SPOOL コマンドが役立つことがあります。このコマンドは、セッションの出力をすべてローカルファイルシステム上のファイルに取り込みます。詳細については、「API キーによる Confluent Cloud 上の ksqlDB アプリケーションへのアクセス」を参照してください。

以下の手順に従って移行処理を完了します。

  1. すべてのストリーム定義を取り込みます。

    LIST STREAMS EXTENDED;
    
  2. 各ストリームを作成した SQL ステートメントをコピーしてファイルに保存します。その際に KSQL_PROCESSING_LOG は無視します。これらのステートメントは後のステップで実行します。

  3. すべてのテーブル定義を取り込みます。

    LIST TABLES EXTENDED;
    
  4. 各テーブルを作成した SQL ステートメントをコピーしてスキーマファイルに保存します。

  5. カスタムタイプを取り込みます。

    LIST TYPES;
    
  6. LIST TYPES 出力を CREATE TYPE <name> AS <schema> 構文に変換します。その際に名前は出力の 1 列目から取得し、スキーマは 2 列目から取得します。これらのステートメントをスキーマファイルに保存します。

  7. 取り込んだすべての SQL ステートメントを依存関係順に並べます。スキーマの再構築に必要なすべての SQL ステートメントのリストはできましたが、まだ依存関係順に並んでいません。各ステートメントがそれぞれ依存する他のステートメントの後にくるように、ステートメントを並べ替える必要があります。

  8. ksqlDB の新旧バージョン間での構文または機能の変更がある場合を考慮して、取り込んだ SQL ステートメントをアップデートします。

  9. ウェブインターフェイスまたは Confluent Cloud CLI を使用して、古いアプリケーションを削除します。これを行わなければ、新旧両方のアプリケーションがシンクトピックへパブリッシュされ、未定義の動作となります。

  10. ウェブインターフェイスまたは Confluent Cloud CLI を使用して、既存の Kafka クラスターによりサポートされている新しい Confluent Cloud ksqlDB アプリケーションをプロビジョニングします。

  11. 新しいアプリケーションが最も古いオフセットからすべての Kafka データを消費するように設定するには、スキーマファイルの先頭に SET 'auto.offset.reset'='earliest' という行を追加します。

  12. 前出のステップで作成したスキーマファイルの各 SQL ステートメントを実行して、新しいアプリケーション内でスキーマを構築します。これを実行するには KSQL CLI セッションから RUN SCRIPT コマンドを使用するのが最良の方法であり、入力としてファイルが使用されます。

  13. アプリケーションステートを再構築します。

データベーススキーマを新しいアプリケーションに移植すると、ksqlDB がデータの処理を開始します。SET 'auto.offset.reset'='earliest' を使用した場合、新しいアプリケーションは各入力トピックの先頭からすべてのデータの処理を開始します。アプリケーションは、古いアプリケーションが既に処理したデータを処理します。

注釈

古いアプリケーションが処理した送信元データは、新しいアプリケーションで処理する際に Kafka では使用できなくなっている可能性があります。保持期間が限られているトピックなどがこれにあたります。新しいアプリケーションでは、以前のアプリケーションと結果が異なる可能性があります。

既存の Kafka トピックに出力する永続的なクエリは、入力データを再生する際に重複行を生成する場合があります。たとえば、以下の永続的なクエリは sink_topic という名前のトピックに出力を書き込みます。

CREATE STREAM output_stream WITH (kafka_topic=’sink_topic’, value_format=’json’) AS
  SELECT * FROM input_stream EMIT CHANGES;

新しい ksqlDB アプリケーションで既存の Kafka クラスターを使用して同じクエリが再作成されると、古いものから順にクエリが消費される場合はその出力が複製されます。