チュートリアル : Kafka との間のデータの移動¶
このチュートリアルでは、1 行もコードを記述せずに、Apache Kafka® との間でデータを移動させる方法について具体的に説明します。このガイドに記載された手順を実行するとともに、Kafka Connect の 概念 について確認することで、より深い理解が得やすくなります。このチュートリアルを終了すると、次のことができるようになります。
- Confluent CLI を使用して Confluent サービスを管理する(分散モードでの単一の Connect ワーカーの起動、およびコネクターのロードとアンロードなど)。
- ファイルからデータを読み取り、Kafka トピックにパブリッシュする。
- Kafka トピックからデータを読み取り、ファイルにパブリッシュする。
- スキーマレジストリ とコネクターを連携させる。
Kafka Connect の基本的な機能および Confluent Schema Registry との連携を実際に試してみるため、少数のローカルのスタンドアロンモードの Kafka Connect プロセスおよびコネクターが実行されます。ファイルに書き込まれたデータを Kafka に挿入すること、Kafka トピックのデータをコンソールに書き出すことができます。Connect データフォーマットとして JSON を使用している場合、スキーマレジストリ を含まないチュートリアルについては、こちら を参照してください。
ちなみに
これらの手順では、Confluent CLI を使用して Confluent Platform をインストールしていることが前提となっています。詳細については「 オンプレミスのデプロイ」を参照してください。
サービスの起動¶
- 前提条件
- Confluent Platform
- Confluent CLI (個別のインストールが必要)
このガイドでは、サービスが、デフォルトのプロパティを使用して localhost
で実行されることを前提としています。
ちなみに
Confluent の bin
ディレクトリがまだ PATH に含まれていない場合は、export PATH=<path-to-confluent>/bin:$PATH
を実行して追加します。
Confluent の bin
ディレクトリが PATH
変数に含まれているので、Confluent CLI confluent local コマンドを使用してすべてのサービスを開始できます。
confluent local services start
重要
confluent local コマンドは、単一ノードの開発環境での使用が想定されており、本稼働環境には適していません。生成されるデータは一時的であり、暫定的なものです。本稼働環境対応のワークフローについては、「Confluent Platform のインストールおよびアップグレード」を参照してください。
各サービスが順番に開始され、状態についてのメッセージが出力されます。
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting KSQL Server
KSQL Server is [UP]
Starting Control Center
Control Center is [UP]
Connect のログを開くと、サービスが正常に開始されたことを確認できます。
confluent local services connect log
Confluent CLI confluent local コマンドでサービスを開始する際にエラーが発生した場合、各サービスのログは 1 か所にまとまっているので、これらのログが保存されているディレクトリに移動することで、アクセスできます。以下に例を示します。
# Show the log directory
confluent local current
/tmp/confluent.w1CpYsaI
# Navigate to the log directory
cd /tmp/confluent.w1CpYsaI
# View the log
less connect/connect.stderr
これらのサービスのセットアップと実行の詳細については、Confluent Platform の インストールに関するドキュメント を参照してください。
Connect でのファイルのデータの読み取り¶
FileStream Source Connector を起動して、ファイルから構造化データを読み取り、Kafka にデータをエクスポートし、その際、スキーマレジストリ を使用して Connect にデータの構造を伝えるには、Confluent CLI confluent local コマンドを使用して事前定義されている、サポートされているコネクターの構成のいずれかを使用します。定義済みのコネクターの構成の一覧を確認するには、次のように実行します。
confluent local services connect connector list
Bundled Predefined Connectors (edit configuration under etc/):
file-source
file-sink
replicator
注釈
既にコネクターをインストールしている場合、これ以外のコネクターが一覧に表示されることがあります。
ここで最初に使用する構成済みのコネクターの名前は file-source
です。このコネクターの構成ファイルは、./etc/kafka/connect-file-source.properties
にあります。以下は、内容の説明です。
# User defined connector instance name.
name=file-source
# The class implementing the connector
connector.class=FileStreamSource
# Maximum number of tasks to run for this connector instance
tasks.max=1
# The input file (path relative to worker's working directory)
# This is the only setting specific to the FileStreamSource
file=test.txt
# The output topic in Kafka
topic=connect-test
スキーマレジストリ を使用せずにこのチュートリアルを実行する場合は、key.converter
および value.converter
プロパティで org.apache.kafka.connect.json.JsonConverter
を使用するように指定することも必要です。これにより、このコネクターについてのみ、コンバーターの設定がオーバーライドされます。
これで、コネクターを読み込む準備ができましたが、その前に、ファイルにサンプルデータを入力しましょう。コネクターの構成ではファイルの相対パスを指定していることに注意してください。そのため、Kafka Connect ワーカーを実行するのと同じディレクトリにファイルを作成する必要があります。
for i in {1..3}; do echo "log line $i"; done > test.txt
次に、先ほど定義した構成ファイルを使用して、FileStreamSourceConnector のインスタンスを起動します。これは、コマンドラインから簡単に実行できます。次のように、Confluent CLI confluent local コマンドを使用します。
confluent local services connect connector load file-source
{
"name": "file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": "test.txt",
"topics": "connect-test",
"name": "file-source"
},
"tasks": []
}
成功すると、コネクターの構成のスナップショットが出力されます。どのコネクターが読み込まれているかを確認するには、次のように実行します。
confluent local services connect connector status
[
"file-source"
]
ちなみに
Confluent Platform が実行され、CLI がインストールされている場合、Kafka Connect ログファイルへのパスは、$(confluent local current)/connect/connect.stdout
となります。たとえば、このファイルでエラーを検索するには、次のように実行します : cat $(confluent local current)/connect/connect.stdout | grep ERROR
このワーカーで読み込まれたすべてのコネクターの一覧が表示されます。同じコマンドでコネクター名を指定すると、コネクターが正常に起動されたか、障害が発生したかなど、そのコネクターの状態が表示されます。たとえば、先ほど読み込んだコネクターについてこのコマンドを実行すると、次のようになります。
confluent local services connect connector status file-source
{
"name": "file-source",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.10.1:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "192.168.10.1:8083"
}
]
}
コネクターが起動されるとすぐに、ログファイルの 3 行がそれぞれ Kafka に配信され、スキーマが スキーマレジストリ に登録されます。データが存在することを確認する方法の 1 つは、別のコンソールでコンソールコンシューマーを使用して、トピックのコンテンツを調べる方法です。
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
"log line 1"
"log line 2"
"log line 3"
データは Avro フォーマットを使用して Kafka に保存されているため、ここでは kafka-avro-console-consumer
を使用することに注意してください。このコンシューマーは、Avro データのスキーマを適切に検索するために、スキーマレジストリ にバンドルされている Avro コンバーターを使用します。
Connect でのファイルのデータの書き込み¶
Connect を使用して Kafka トピックにデータを書き込むことができたので、そのデータをダウンストリームプロセスで消費しましょう。このセクションでは、先ほどのセクションで起動したソースに加え、シンクコネクターをワーカーに読み込みます。シンクは、メッセージをローカルファイルに書き込みます。このコネクターも Confluent CLI confluent local コマンドで事前定義されており、file-sink
という名前です。以下は、コネクターの構成です。この構成は etc/kafka/connect-file-sink.properties
に保存されています。
# User defined name for the connector instance
name=file-sink
# Name of the connector class to be run
connector.class=FileStreamSink
# Max number of tasks to spawn for this connector instance
tasks.max=1
# Output file name relative to worker's current working directory
# This is the only property specific to the FileStreamSink connector
file=test.sink.txt
# Comma separate input topic list
topics=connect-test
この構成には、file-source と同様の設定が含まれていることに注目してください。主な違いは、file-source では、topic
で出力トピックを 1 つだけ指定できるのに対して、topics
では複数の入力トピックを指定できることです。
ここで、FileStreamSinkConnector を起動します。シンクコネクターはソースコネクターと同じワーカー内で実行されますが、コネクターのタスクごとに専用のスレッドが使用されます。
confluent local services connect connector load file-sink
{
"name": "file-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": "1",
"file": "test.sink.txt",
"topics": "connect-test",
"name": "file-sink"
},
"tasks": []
}
シンクコネクターの実行状態を確認するには、Confluent CLI confluent local services connect connector status コマンドを使用して、対象のコネクターのステートを取得します。
confluent local services connect connector status file-sink
{
"name": "file-sink",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.10.1:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "192.168.10.1:8083"
}
]
}
また、読み込まれているすべてのコネクターの一覧も確認できます。
confluent local services connect connector status connectors
[
"file-source",
"file-sink"
]
ちなみに
コネクターが読み込まれるたびにワーカータスクのバランス調整が行われるため、新しいコネクターが読み込まれた直後は、 confluent local services connect connector status <connector-name>
の呼び出しが成功しないことがあります。バランス調整が完了すると、このような呼び出しでコネクターの実際の状態が返されるようになります。
test.sink.txt
ファイルを開くと、シンクコネクターによって 2 行のログが書き込まれていることを確認できます。
両方のコネクターが実行されているため、エンドツーエンドでデータが流れる様子をリアルタイムで確認できます。確認するには、もう 1 つのターミナルで tail コマンドを使用して、出力ファイルの最新部分を表示します。
tail -f test.sink.txt
別のターミナルで、テキストファイルの末尾への行の追加を開始します。
for i in {4..1000}; do echo "log line $i"; done >> test.txt
test.sink.txt
に行が追加されることを確認できます。新しいデータは、ソースコネクターによって取得され、Kafka に書き込まれ、シンクコネクターによって Kafka から読み取られ、最終的に、ファイルに追加されます。
"log line 1"
"log line 2"
"log line 3"
"log line 4"
"log line 5"
...
Connect でデータをファイルから読み取り、ファイルに書き込む実験が終わったら、コネクターをシャットダウンしますが、それにはいくつかの方法があります。
- コネクターをアンロードし、Connect ワーカーは実行中のままにします。
confluent local services connect connector unload file-source
confluent local services connect connector unload file-sink
- Connect ワーカーをまとめて停止します。
confluent local services connect stop
Stopping Connect
Connect is [DOWN]
- Connect ワーカーおよびその他すべての Confluent サービスを停止します。
confluent local services stop
出力は以下のようになります。
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
- すべてのサービスを停止し、今回実行した Confluent サービスのデータをすべて消去します。
confluent local destroy
出力は以下のようになります。
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE
ソースコネクターとシンクコネクターのどちらもオフセットの追跡ができるため、プロセスの開始と終了を何度も行って、入力ファイルにさらにデータを追加しても、どちらのコネクターも前回終了時点から再開できます。
このチュートリアルで使用したコネクターは意図的にシンプルになっており、これ以上の依存関係は必要ありません。多くの場合、コネクターでは、ソースシステムまたはシンクシステムへの接続方法やコピーするデータを指定するため、より詳細な構成が必要になります。また、拡張性とフォールトトレランスを確保するため、Kafka Connect クラスターで実行することが必要な場合も多いでしょう。Kafka Connect の使用を開始する場合、分散モードでの実行方法など、Kafka Connect の実行と管理の詳細については、ユーザーガイド が参考になります。コネクター のセクションには、Confluent Platform に付属するコネクターの構成とデプロイに関する詳細が記載されています。
ちなみに
コネクターの作成、構成、管理を行う最も簡単な方法は、Confluent Control Center を使用することです。Control Center の詳細については、「 Confluent Control Center」を参照してください。