ksqlDB の運用

YouTube の screencast of Taking KSQL to Production をご覧ください。

Confluent CLI によるローカルでの開発およびテスト

開発およびテストを目的として、Confluent CLI を使用して 1 台のホスト上でサービスをスピンアップすることができます。詳細については Confluent クイックスタート を参照してください。

重要

confluent local コマンドは、単一ノードの開発環境向けであり、本稼働環境には適していません。生成されるデータは一過性で、一時的なものです。本稼働環境対応のワークフローについては、「Confluent Platform のインストールおよびアップグレード」を参照してください。

ksqlDB のインストールと構成

ksqlDB サーバーをセットアップする際には多数のオプションがあります。ksqlDB のインストールおよび構成の詳細については、以下のトピックを参照してください。

ksqlDB クラスターの開始および停止

ksqlDB は以下の開始および停止スクリプトを提供します。

ksql-server-start
このスクリプトは ksqlDB サーバーを開始します。引数としてサーバー構成ファイルを必要とし Confluent Platform インストール先の /bin ディレクトリに配置されています。詳細については、「ksqlDB サーバーの起動」を参照してください。
ksql-server-stop
このスクリプトは ksqlDB サーバーを停止します。Confluent Platform インストール先の /bin ディレクトリに配置されています。

正常性の確認

  • ksqlDB REST API は http://<server> :8088/info で "server info" リクエストをサポートし http://<server> :8088/healthcheck で基本的なサーバーヘルスチェックエンドポイントをサポートしています。
  • DESCRIBE <stream or table> EXTENDED および EXPLAIN <name of query> を使用して、接続されている ksqlDB サーバーの実行時統計情報を確認します。

モニタリングとメトリクス

ksqlDB には JMX(Java Management Extensions)メトリクスが含まれており、これらによって ksqlDB サーバーの内部で起きていることに関する知見が得られます。これらのメトリクスには、メッセージ数、合計スループット、スループット分布、エラー率などがあります。

JMX メトリクスを有効にするには ksqlDB サーバーを起動する前に JMX_PORT を以下のように設定します。

export JMX_PORT=1099 && \
$CONFLUENT_HOME/bin/ksql-server-start $CONFLUENT_HOME/etc/ksqldb/ksql-server.properties

Kafka Streams メトリクスの詳細については Kafka Streams アプリケーションのモニタリング を参照してください。

キャパシティプランニング

キャパシティプランニングガイド」では、ksqlDB クラスターの規模を設定する方法について説明しています。

トラブルシューティング

SELECT クエリがハングして停止しません。

SELECT * FROM myTable EMIT CHANGES のような非永続的なクエリをはじめとして ksqlDB のクエリは継続的にストリーミングされるクエリです。ストリーミングクエリは、明示的に終了しない限り停止しません。ksqlDB CLI で非永続的なクエリを終了するには Ctrl + C と入力する必要があります。

SELECT * FROM テーブルまたはストリームの結果がありません。

これは一般的に、クエリが新着データのみを処理するように構成されているにもかかわらず、新規入力レコードが受信されていない場合に発生します。修正するには、以下のいずれかの操作を行います。

ウィンドウ化された集約の出力からストリームを作成できません。

ksqlDB は構造化キーをサポートしていないため、ウィンドウ化された集約からストリームを作成できません。

ksqlDB が内部トピックをクリーンアップしません。

Apache Kafka® クラスターに対して delete.topic.enable=true が構成されていることを確認します。詳細については、「deleteTopics」を参照してください。

ksqlDB CLI が ksqlDB サーバーに接続しません。

ksqlDB CLI を起動すると以下の警告が表示される場合があります。

**************** WARNING ******************
Remote server address may not be valid:
Error issuing GET to KSQL server

Caused by: java.net.SocketException: Connection reset
Caused by: Connection reset
*******************************************

また、CLI を使用して ksqlDB クエリを作成する際にも同様のエラーが表示される場合があります。

Error issuing POST to KSQL server
Caused by: java.net.SocketException: Connection reset
Caused by: Connection reset

どちらの場合も、CLI が ksqlDB サーバーに接続できないのは、以下の状態のいずれかが原因である可能性があります。

  • ksqlDB CLI が ksqlDB サーバーの正しいポートに接続されていない。
  • ksqlDB サーバーが実行されていない。
  • ksqlDB サーバーは実行されているが、別のポートをリッスンしている。

ksqlDB CLI が使用しているポートを確認する

ksqlDB CLI に ksqlDB サーバーのポートが正しく構成されていることを確認します。デフォルトでは、サーバーはポート 8088 をリッスンします。詳細については、 ksqlDB CLI の起動 を参照してください。

ksqlDB サーバーの構成を確認する

ksqlDB サーバーの構成ファイルで、リスナーのリストにあるホストアドレスとポートが正しく構成されていることを確認します。listeners の設定を確認します。

listeners=http://0.0.0.0:8088

または IPv6 経由で実行している場合は以下を確認します。

listeners=http://[::]:8088

詳細については ksqlDB サーバーの起動 を参照してください。

ポートの競合を確認する

ksqlDB サーバーがリッスンしているポートで別のプロセスが実行されている場合があります。以下のコマンドを使用して、ksqlDB サーバーに割り当てられたポートで実行されているプロセスを確認します。以下の例では、デフォルトのポートである 8088 を確認します。

netstat -anv | egrep -w .*8088.*LISTEN

出力は以下のようになります。

tcp4  0 0  *.8088       *.*    LISTEN      131072 131072    46314      0

この例では 46314 が、ポート 8088 上でリッスンしているプロセスの PID です。次のコマンドを実行してプロセスに関する情報を取得します。

ps -wwwp <pid>

出力は以下のようになります。

io.confluent.ksql.rest.server.KsqlServerMain ./config/ksql-server.properties

KsqlServerMain プロセスが表示されていなければ、KsqlServerMain が通常使用するポートを別のプロセスが使用しています。ksqlDB サーバーの構成ファイルで割り当て済みリスナーを確認し、正しいポート設定で ksqlDB CLI を再起動します。

Avro スキーマで複製されたトピックによりエラーが発生します。

Confluent Replicator はレプリケーション時にトピックの名前を変更しますが、関連する Avro スキーマがある場合に、それらと名前変更されたトピックとの照合は自動では行われません。

ksqlDB CLI では、複製されたトピックの PRINT ステートメントは機能します。これは、Avro スキーマ ID が Schema Registry 内に存在し、ksqlDB がその Avro メッセージを逆シリアル化できることを示しています。ただし CREATE STREAM は正しく動作せず、逆シリアル化エラーが発生します。

CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews.replica', value_format='AVRO');

[2018-06-21 19:12:08,135] WARN task [1_6] Skipping record due to deserialization error. topic=[pageviews.replica] partition=[6] offset=[1663] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.connect.errors.DataException: pageviews.replica
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
        at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:48)
        at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:27)

これを解決するには、複製されたトピックのサブジェクト名に対してスキーマを手動で登録します。

# Original topic name = pageviews
# Replicated topic name = pageviews.replica
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schema\": $(curl -s http://localhost:8081/subjects/pageviews-value/versions/latest | jq '.schema')}" http://localhost:8081/subjects/pageviews.replica-value/versions

ksqlDB のサーバーログを確認する

それでも問題がある場合は ksqlDB のサーバーログでエラーを確認します。

confluent log ksql-server

デフォルトのディレクトリ /usr/local/logs または ksqlDB CLI 起動時に割り当てた LOG_DIR にあるログを確認します。詳細については、 ksqlDB CLI の起動 を参照してください。