クイックスタート: Kafka Connect を使用した Kafka との間のデータ移動

このチュートリアルでは、1 行もコードを記述せずに、Apache Kafka® との間でデータを移動させる方法について具体的に説明します。このガイドに記載された手順を実行するとともに、Kafka Connect の 概念 について確認することで、より深い理解が得やすくなります。このチュートリアルを終了すると、次のことができるようになります。

  • Confluent CLI を使用して Confluent サービスを管理する(分散モードでの単一の Connect ワーカーの起動、およびコネクターのロードとアンロードなど)。
  • ファイルからデータを読み取り、Kafka トピックにパブリッシュする。
  • Kafka トピックからデータを読み取り、ファイルにパブリッシュする。
  • Schema Registry とコネクターを連携させる。

Kafka Connect の基本的な機能および Confluent Schema Registry との連携を実際に試してみるため、少数のローカルのスタンドアロンモードの Kafka Connect プロセスおよびコネクターが実行されます。ファイルに書き込まれたデータを Kafka に挿入すること、Kafka トピックのデータをコンソールに書き出すことができます。Connect データフォーマットとして JSON を使用している場合、Schema Registry を含まないチュートリアルについては、こちら を参照してください。

ちなみに

これらの手順では、Confluent CLI を使用して Confluent Platform をインストールしていることが前提となっています。詳細については「オンプレミスのデプロイ」を参照してください。

サービスの起動

前提条件

このガイドでは、サービスが、デフォルトのプロパティを使用して localhost で実行されることを前提としています。

ちなみに

Confluent の bin ディレクトリがまだ PATH に含まれていない場合は、export PATH=<path-to-confluent>/bin:$PATH を実行して追加します。

Confluent の bin ディレクトリが PATH 変数に含まれているため、Confluent CLI confluent local コマンドを使用してすべてのサービスを開始できます。

confluent local services start

重要

confluent local コマンドは、単一ノードの開発環境向けであり、本稼働環境には適していません。生成されるデータは一過性で、一時的なものです。本稼働環境対応のワークフローについては、「Confluent Platform のインストールおよびアップグレード」を参照してください。

各サービスが順番に開始され、状態についてのメッセージが出力されます。

Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting KSQL Server
KSQL Server is [UP]
Starting Control Center
Control Center is [UP]

Connect のログを開くと、サービスが正常に開始されたことを確認できます。

confluent local services connect log

Confluent CLI confluent local コマンドでサービスを開始する際にエラーが発生した場合、各サービスのログは 1 か所にまとまっているため、これらのログが保存されているディレクトリに移動することで、アクセスできます。以下に例を示します。

# Show the log directory
confluent local current

/tmp/confluent.w1CpYsaI
# Navigate to the log directory
cd /tmp/confluent.w1CpYsaI
# View the log
less connect/connect.stderr

これらのサービスのセットアップと実行の詳細については、Confluent Platform の インストールに関するドキュメント を参照してください。

Connect でのファイルのデータの読み取り

FileStream Source Connector を起動して、ファイルから構造化データを読み取り、Kafka にデータをエクスポートし、その際、Schema Registry を使用して Connect にデータの構造を伝えるには、Confluent CLI confluent local コマンドを使用して事前定義されている、サポートされているコネクターの構成のいずれかを使用します。定義済みのコネクターの構成の一覧を確認するには、次のように実行します。

confluent local services connect connector list

Bundled Predefined Connectors (edit configuration under etc/):
   file-source
   file-sink
   replicator

注釈

既にコネクターをインストールしている場合、これ以外のコネクターが一覧に表示されることがあります。

ここで最初に使用する構成済みのコネクターの名前は file-source です。このコネクターの構成ファイルは、./etc/kafka/connect-file-source.properties にあります。以下は、内容の説明です。

# User defined connector instance name.
name=file-source
# The class implementing the connector
connector.class=FileStreamSource
# Maximum number of tasks to run for this connector instance
tasks.max=1
# The input file (path relative to worker's working directory)
# This is the only setting specific to the FileStreamSource
file=test.txt
# The output topic in Kafka
topic=connect-test

Schema Registry を使用せずにこのチュートリアルを実行する場合は、key.converter および value.converter プロパティで org.apache.kafka.connect.json.JsonConverter を使用するように指定することも必要です。これにより、このコネクターについてのみ、コンバーターの設定がオーバーライドされます。

これで、コネクターを読み込む準備ができましたが、その前に、ファイルにサンプルデータを入力しましょう。コネクターの構成ではファイルの相対パスを指定していることに注意してください。そのため、Kafka Connect ワーカーを実行するのと同じディレクトリにファイルを作成する必要があります。

for i in {1..3}; do echo "log line $i"; done > test.txt

次に、先ほど定義した構成ファイルを使用して、FileStreamSourceConnector のインスタンスを起動します。これは、コマンドラインから簡単に実行できます。次のように、Confluent CLI confluent local コマンドを使用します。

  confluent local services connect connector load file-source

{
  "name": "file-source",
  "config": {
    "connector.class": "FileStreamSource",
    "tasks.max": "1",
    "file": "test.txt",
    "topics": "connect-test",
    "name": "file-source"
  },
  "tasks": []
}

成功すると、コネクターの構成のスナップショットが出力されます。どのコネクターが読み込まれているかを確認するには、次のように実行します。

  confluent local services connect connector status

[
  "file-source"
]

ちなみに

Confluent Platform が実行され、CLI がインストールされている場合、Kafka Connect ログファイルへのパスは、$(confluent local current)/connect/connect.stdout となります。たとえば、このファイルでエラーを検索するには、次のように実行します : cat $(confluent local current)/connect/connect.stdout | grep ERROR

このワーカーで読み込まれたすべてのコネクターの一覧が表示されます。同じコマンドでコネクター名を指定すると、コネクターが正常に起動されたか、障害が発生したかなど、そのコネクターの状態が表示されます。たとえば、先ほど読み込んだコネクターについてこのコマンドを実行すると、次のようになります。

  confluent local services connect connector status file-source

{
  "name": "file-source",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.10.1:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "192.168.10.1:8083"
    }
  ]
}

コネクターが起動されるとすぐに、ログファイルの 3 行がそれぞれ Kafka に配信され、スキーマが Schema Registry に登録されます。データが存在することを確認する方法の 1 つは、別のコンソールでコンソールコンシューマーを使用して、トピックのコンテンツを調べる方法です。

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
"log line 1"
"log line 2"
"log line 3"

データは Avro フォーマットを使用して Kafka に保存されているため、ここでは kafka-avro-console-consumer を使用することに注意してください。このコンシューマーは、Avro データのスキーマを適切に検索するために、Schema Registry にバンドルされている Avro コンバーターを使用します。

Connect でのファイルのデータの書き込み

Connect を使用して Kafka トピックにデータを書き込むことができたので、そのデータをダウンストリームプロセスで消費しましょう。このセクションでは、先ほどのセクションで起動したソースに加え、シンクコネクターをワーカーに読み込みます。シンクは、メッセージをローカルファイルに書き込みます。このコネクターも Confluent CLI confluent local コマンドで事前定義されており、file-sink という名前です。以下は、コネクターの構成です。この構成は etc/kafka/connect-file-sink.properties に保存されています。

# User defined name for the connector instance
name=file-sink
# Name of the connector class to be run
connector.class=FileStreamSink
# Max number of tasks to spawn for this connector instance
tasks.max=1
# Output file name relative to worker's current working directory
# This is the only property specific to the FileStreamSink connector
file=test.sink.txt
# Comma separate input topic list
topics=connect-test

この構成には、file-source と同様の設定が含まれていることに注目してください。主な違いは、file-source では、topic で出力トピックを 1 つだけ指定できるのに対して、topics では複数の入力トピックを指定できることです。

ここで、FileStreamSinkConnector を起動します。シンクコネクターはソースコネクターと同じワーカー内で実行されますが、コネクターのタスクごとに専用のスレッドが使用されます。

  confluent local services connect connector load file-sink

{
  "name": "file-sink",
  "config": {
    "connector.class": "FileStreamSink",
    "tasks.max": "1",
    "file": "test.sink.txt",
    "topics": "connect-test",
    "name": "file-sink"
  },
  "tasks": []
}

シンクコネクターの実行状態を確認するには、Confluent CLI confluent local services connect connector status コマンドを使用して、対象のコネクターのステートを取得します。

  confluent local services connect connector status file-sink

{
  "name": "file-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.10.1:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "192.168.10.1:8083"
    }
  ]
}

また、読み込まれているすべてのコネクターの一覧も確認できます。

  confluent local services connect connector status connectors

[
  "file-source",
  "file-sink"
]

ちなみに

コネクターが読み込まれるたびにワーカータスクのバランス調整が行われるため、新しいコネクターが読み込まれた直後は、 confluent local services connect connector status <connector-name> の呼び出しが成功しないことがあります。バランス調整が完了すると、このような呼び出しでコネクターの実際の状態が返されるようになります。

test.sink.txt ファイルを開くと、シンクコネクターによって 2 行のログが書き込まれていることを確認できます。

両方のコネクターが実行されているため、エンドツーエンドでデータが流れる様子をリアルタイムで確認できます。確認するには、もう 1 つのターミナルで tail コマンドを使用して、出力ファイルの最新部分を表示します。

tail -f test.sink.txt

別のターミナルで、テキストファイルの末尾への行の追加を開始します。

for i in {4..1000}; do echo "log line $i"; done >> test.txt

test.sink.txt に行が追加されることを確認できます。新しいデータは、ソースコネクターによって取得され、Kafka に書き込まれ、シンクコネクターによって Kafka から読み取られ、最終的に、ファイルに追加されます。

"log line 1"
"log line 2"
"log line 3"
"log line 4"
"log line 5"
 ...

Connect でデータをファイルから読み取り、ファイルに書き込む実験が終わったら、コネクターをシャットダウンしますが、それにはいくつかの方法があります。

  • コネクターをアンロードし、Connect ワーカーは実行中のままにします。
confluent local services connect connector unload file-source
confluent local services connect connector unload file-sink
  • Connect ワーカーをまとめて停止します。
  confluent local services connect stop
Stopping Connect
Connect is [DOWN]
  • Connect ワーカーおよびその他すべての Confluent サービスを停止します。
confluent local services stop

出力は以下のようになります。

Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
  • すべてのサービスを停止し、今回実行した Confluent サービスのデータをすべて消去します。
confluent local destroy

出力は以下のようになります。

Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE

ソースコネクターとシンクコネクターのどちらもオフセットの追跡ができるため、プロセスの開始と終了を何度も行って、入力ファイルにさらにデータを追加しても、どちらのコネクターも前回終了時点から再開できます。

The connectors demonstrated in this tutorial are intentionally simple so no additional dependencies are necessary. Most connectors will require a bit more configuration to specify how to connect to the source or sink system and what data to copy, and for many you will want to execute on a Kafka Connect cluster for scalability and fault tolerance. To get started with Kafka Connect you'll want to see the user guide for more details on running and managing Kafka Connect, including how to run in distributed mode. The Connectors section includes details on configuring and deploying the connectors that ship with Confluent Platform.

ちなみに

コネクターの作成、構成、管理を行う最も簡単な方法は、Confluent Control Center を使用することです。Control Center の詳細については、「Confluent Control Center」を参照してください。