ksqlDB の運用¶
YouTube の screencast of Taking KSQL to Production をご覧ください。
Confluent CLI によるローカルでの開発およびテスト¶
開発およびテストを目的として、Confluent CLI を使用して 1 台のホスト上でサービスをスピンアップすることができます。詳細については Apache Kafka クイックスタート を参照してください。
重要
confluent local コマンドは、単一ノードの開発環境向けであり、本稼働環境には適していません。生成されるデータは一過性で、一時的なものです。本稼働環境対応のワークフローについては、「Confluent Platform のインストールおよびアップグレード」を参照してください。
ksqlDB のインストールと構成¶
ksqlDB サーバーをセットアップする際には多数のオプションがあります。ksqlDB のインストールおよび構成の詳細については、以下のトピックを参照してください。
ksqlDB クラスターの開始および停止¶
ksqlDB は以下の開始および停止スクリプトを提供します。
- ksql-server-start
- このスクリプトは ksqlDB サーバーを開始します。引数としてサーバー構成ファイルを必要とし Confluent Platform インストール先の
/bin
ディレクトリに配置されています。詳細については、「ksqlDB サーバーの起動」を参照してください。 - ksql-server-stop
- このスクリプトは ksqlDB サーバーを停止します。Confluent Platform インストール先の
/bin
ディレクトリに配置されています。
正常性の確認¶
- ksqlDB REST API は
http://<server> :8088/info
で "server info" リクエストをサポートしhttp://<server> :8088/healthcheck
で基本的なサーバーヘルスチェックエンドポイントをサポートしています。 DESCRIBE EXTENDED <stream or table>
およびEXPLAIN <name of query>
を使用して、接続されている ksqlDB サーバーの実行時統計情報を確認します。
モニタリングとメトリクス¶
ksqlDB には JMX(Java Management Extensions)メトリクスが含まれており、これらによって ksqlDB サーバーの内部で起きていることに関する知見が得られます。これらのメトリクスには、メッセージ数、合計スループット、スループット分布、エラー率などがあります。
JMX メトリクスを有効にするには ksqlDB サーバーを起動する前に JMX_PORT
を以下のように設定します。
export JMX_PORT=1099 && \
$CONFLUENT_HOME/bin/ksql-server-start $CONFLUENT_HOME/etc/ksqldb/ksql-server.properties
Kafka Streams メトリクスの詳細については Kafka Streams アプリケーションのモニタリング を参照してください。
キャパシティプランニング¶
「キャパシティプランニングガイド」では、ksqlDB クラスターの規模を設定する方法について説明しています。
トラブルシューティング¶
SELECT クエリがハングして停止しません。¶
SELECT * FROM myTable EMIT CHANGES
のような非永続的なクエリをはじめとして ksqlDB のクエリは継続的にストリーミングされるクエリです。ストリーミングクエリは、明示的に終了しない限り停止しません。ksqlDB CLI で非永続的なクエリを終了するには Ctrl + C
と入力する必要があります。
SELECT * FROM
テーブルまたはストリームの結果がありません。¶
これは一般的に、クエリが新着データのみを処理するように構成されているにもかかわらず、新規入力レコードが受信されていない場合に発生します。修正するには、以下のいずれかの操作を行います。
SET 'auto.offset.reset' = 'earliest';
を実行します。詳細については、「ksqlDB CLI の構成」および「ksqlDB サーバーを構成する」を参照してください。- 入力トピックに新しいレコードを書き込みます。
ウィンドウ化された集約の出力からストリームを作成できません。¶
ksqlDB は構造化キーをサポートしていないため、ウィンドウ化された集約からストリームを作成できません。
ksqlDB が内部トピックをクリーンアップしません。¶
Apache Kafka® クラスターに対して delete.topic.enable=true
が構成されていることを確認します。詳細については、「deleteTopics」を参照してください。
ksqlDB CLI が ksqlDB サーバーに接続しません。¶
ksqlDB CLI を起動すると以下の警告が表示される場合があります。
**************** WARNING ******************
Remote server address may not be valid:
Error issuing GET to KSQL server
Caused by: java.net.SocketException: Connection reset
Caused by: Connection reset
*******************************************
また、CLI を使用して ksqlDB クエリを作成する際にも同様のエラーが表示される場合があります。
Error issuing POST to KSQL server
Caused by: java.net.SocketException: Connection reset
Caused by: Connection reset
どちらの場合も、CLI が ksqlDB サーバーに接続できないのは、以下の状態のいずれかが原因である可能性があります。
- ksqlDB CLI が ksqlDB サーバーの正しいポートに接続されていない。
- ksqlDB サーバーが実行されていない。
- ksqlDB サーバーは実行されているが、別のポートをリッスンしている。
ksqlDB CLI が使用しているポートを確認する¶
ksqlDB CLI に ksqlDB サーバーのポートが正しく構成されていることを確認します。デフォルトでは、サーバーはポート 8088
をリッスンします。詳細については、 ksqlDB CLI の起動 を参照してください。
ksqlDB サーバーの構成を確認する¶
ksqlDB サーバーの構成ファイルで、リスナーのリストにあるホストアドレスとポートが正しく構成されていることを確認します。listeners
の設定を確認します。
listeners=http://0.0.0.0:8088
または IPv6 経由で実行している場合は以下を確認します。
listeners=http://[::]:8088
詳細については ksqlDB サーバーの起動 を参照してください。
ポートの競合を確認する¶
ksqlDB サーバーがリッスンしているポートで別のプロセスが実行されている場合があります。以下のコマンドを使用して、ksqlDB サーバーに割り当てられたポートで実行されているプロセスを確認します。以下の例では、デフォルトのポートである 8088
を確認します。
netstat -anv | egrep -w .*8088.*LISTEN
出力は以下のようになります。
tcp4 0 0 *.8088 *.* LISTEN 131072 131072 46314 0
この例では 46314
が、ポート 8088
上でリッスンしているプロセスの PID です。次のコマンドを実行してプロセスに関する情報を取得します。
ps -wwwp <pid>
出力は以下のようになります。
io.confluent.ksql.rest.server.KsqlServerMain ./config/ksql-server.properties
KsqlServerMain
プロセスが表示されていなければ、KsqlServerMain
が通常使用するポートを別のプロセスが使用しています。ksqlDB サーバーの構成ファイルで割り当て済みリスナーを確認し、正しいポート設定で ksqlDB CLI を再起動します。
Avro スキーマで複製されたトピックによりエラーが発生します。¶
Confluent Replicator はレプリケーション時にトピックの名前を変更しますが、関連する Avro スキーマがある場合に、それらと名前変更されたトピックとの照合は自動では行われません。
ksqlDB CLI では、複製されたトピックの PRINT
ステートメントは機能します。これは、Avro スキーマ ID が Schema Registry 内に存在し、ksqlDB がその Avro メッセージを逆シリアル化できることを示しています。ただし CREATE STREAM
は正しく動作せず、逆シリアル化エラーが発生します。
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews.replica', value_format='AVRO');
[2018-06-21 19:12:08,135] WARN task [1_6] Skipping record due to deserialization error. topic=[pageviews.replica] partition=[6] offset=[1663] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.connect.errors.DataException: pageviews.replica
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:48)
at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:27)
これを解決するには、複製されたトピックのサブジェクト名に対してスキーマを手動で登録します。
# Original topic name = pageviews
# Replicated topic name = pageviews.replica
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schema\": $(curl -s http://localhost:8081/subjects/pageviews-value/versions/latest | jq '.schema')}" http://localhost:8081/subjects/pageviews.replica-value/versions
ksqlDB のサーバーログを確認する¶
それでも問題がある場合は ksqlDB のサーバーログでエラーを確認します。
confluent log ksql-server
デフォルトのディレクトリ /usr/local/logs
または ksqlDB CLI 起動時に割り当てた LOG_DIR
にあるログを確認します。詳細については、 ksqlDB CLI の起動 を参照してください。