kcat (旧 kafkacat)ユーティリティ¶
kcat (旧 kafkacat)は、Apache Kafka® デプロイのテストおよびデバッグに使用できるコマンドラインユーティリティです。kcat を使用すると、Kafka のトピックおよびパーティション情報の生成、消費、一覧表示ができます。" Kafka 用の netcat" とも言われ、Kafka のデータの検査や作成に役立ち、多機能ナイフのように便利なツールです。
Kafka コンソールプロデューサー(kafka-console-producer
)や Kafka コンソールコンシューマー(kafka-console-consumer
)と似ていますが、それ以上の機能を備えています。
重要
kcat はオープンソースのユーティリティで、https://github.com/edenhill/kafkacat から入手できます。Confluent ではサポートを行っておらず、Confluent Platform には含まれていません。
コンシューマーモード¶
コンシューマーモードでは、kcat はトピックおよびパーティションからメッセージを読み取り、標準出力(stdout)に出力します。Kafka ブローカー(-b
)とトピック(-t
)を指定する必要があります。オプションとして、区切り記号の指定(-D
)もできます。デフォルトの区切り記号は改行です。
kcat でブローカー(-b
)とトピック(-t
)を指定して内容を確認するには、次のようにします。
kafkacat -b localhost:9092 -t mysql_users
% Auto-selecting Consumer mode (use -P or -C to override)
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
[...]
kcat では、ターミナルまたはパイプの種類に応じて自動的にモードが選択されます。
- パイプを使用してデータを kcat に渡すと、自動的にプロデューサー(
-P
)モードが選択されます。 - パイプを使用して kcat からのデータを(ターミナルの標準出力などに)渡すと、自動的にコンシューマー(
-C
)モードが選択されます。
コンシューマーフラグ(-C
)またはプロデューサーフラグ(-P
)を使用すると、明示的にモードを指定できます。小文字のモード識別子と数字を使用して(-c<数字>
など)、表示するメッセージ数を指定することもできます。たとえば、1 つのメッセージを消費するには、次のようにします。
kafkacat -b localhost:9092 -t mysql_users -C -c1
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
メッセージキーを表示するには、-K
を使用し、引数として区切り記号を指定します。たとえば、タブを区切り記号としてメッセージキーを表示するには、次のようにします。
kafkacat -b localhost:9092 -t mysql_users -C -c1 -K\t
1 {"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
-f
フラグは、出力のフォーマットと含めるフィールドの両方を指定する引数を取ります。各メッセージのキーと値のペアを見やすく出力するコマンドの簡単な例を以下に示します。
kafkacat -b localhost:9092 -t mysql_users -C -c1 -f 'Key: %k\nValue: %s\n'
Key: 1
Value: {"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
ここでは、-K:
が置き換えられていることに注意してください。キーパラメーターは -f
のフォーマット文字列で指定されているためです。
-f
のより高度な使い方として、さらに多くのメタデータ(オフセット、タイムスタンプ、データ長)を表示する方法を示します。
kafkacat -b localhost:9092 -t mysql_users -C -c2 -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\nTimestamp: %T\tPartition: %p\tOffset: %o\n--\n'
Key (1 bytes): 1
Value (79 bytes): {"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
Timestamp: 1520618381093 Partition: 0 Offset: 0
--
Key (1 bytes): 2
Value (79 bytes): {"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
Timestamp: 1520618381093 Partition: 0 Offset: 1
--
プロデューサーモード¶
プロデューサーモードでは、kcat は標準入力(stdin)からメッセージを読み取ります。Kafka ブローカー(-b
)とトピック(-t
)を指定する必要があります。オプションとして、区切り記号の指定(-D
)もできます。デフォルトの区切り記号は改行です。
kcat を使用すると、簡単にデータをトピックに送信できます。-P
フラグを指定してコマンドを実行し、必要なデータを入力します。終了するには、Ctrl-D
キーを押します。
kafkacat -b localhost:9092 -t new_topic -P
test
確認するには、-P
を -C
に置き換えて再生します。
kafkacat -b localhost:9092 -t new_topic -C
test
ファイルからデータを追加して、kcat にデータを送信できます。次の例では、-l
フラグを使用して、ファイル /tmp/msgs
の各行を個々のメッセージとして扱います。-l
フラグを指定しないと、ファイル全体が 1 つのメッセージとして扱われます。これは、バイナリデータの送信に役立ちます。この例では、-T
フラグも使用して、入力を stdout
にも表示(エコー)しています。
kafkacat -b localhost:9092 -t <my_topic> -T -P -l /tmp/msgs
These are
three messages
sent through kafkacat
先ほどのコンシューマーの例と同じように、-K
フラグと区切り文字を使用して、メッセージのキーを指定できます。
kafkacat -b localhost:9092 -t keyed_topic -P -K:
1:foo
2:bar
kafkacat -b localhost:9092 -t keyed_topic -C -f 'Key: %k\nValue: %s\n'
Key: 1
Value: foo
Key: 2
Value: bar
次のようにすると、パーティションを設定できます。
kafkacat -b localhost:9092 -t partitioned_topic -P -K: -p 1
1:foo
kafkacat -b localhost:9092 -t partitioned_topic -P -K: -p 2
2:bar
kafkacat -b localhost:9092 -t partitioned_topic -P -K: -p 3
3:wibble
再生するには、前述のようにフォーマットと -f
フラグを使用します。
kafkacat -b localhost:9092 -t partitioned_topic -C -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\nTimestamp: %T\tPartition: %p\tOffset: %o\n--\n'
% Reached end of topic partitioned_topic [0] at offset 0
Key (1 bytes): 1
Value (3 bytes): foo
Timestamp: 1520620113485 Partition: 1 Offset: 0
--
Key (1 bytes): 2
Value (3 bytes): bar
Timestamp: 1520620121165 Partition: 2 Offset: 0
--
Key (1 bytes): 3
Value (6 bytes): wibble
Timestamp: 1520620129112 Partition: 3 Offset: 0
--
メタデータの一覧表示モード¶
メタデータの一覧表示モード(-L
)では、kcat により Kafka クラスターの現在のステート、およびクラスターのトピック、パーティション、レプリカ、同期レプリカ(ISR)が表示されます。
kafkacat -b localhost:9092 -L
JSON(-J
)オプションを追加すると、JSON 形式で出力されます。これは、このデータを他のアプリケーションに渡してさらに処理を行う場合に役立ちます。
kafkacat -b mybroker -L -J
詳細情報や例については、kafkacat GitHub リポジトリ を参照してください。
Confluent Cloud に対する認証用の kcat の構成¶
Confluent Cloud と通信できるように kcat を構成するには、セキュリティ プロトコルの詳細とともに Confluent Cloud の API キーとシークレットを指定します。その例を次に示します。
kafkacat -b localhost:9092 \
-X security.protocol=sasl_ssl -X sasl.mechanisms=PLAIN \
-X sasl.username=<api-key-key> -X sasl.password=<api-key-secret> \
-L
Confluent Cloud ドキュメントの「Confluent Cloud クライアントの構成」も参照してください。
kcat コードの例¶
kcat の基本的なコード例については、「kafkacat: Apache Kafka® のサンプルコマンド」を参照してください。これらのコード例にはすべて、オンプレミスまたは Confluent Cloud で実行される Kafka クラスターに接続できるプロデューサーとコンシューマーが含まれています。