Kafka Connect と Kafka Streams によるパイプライン処理

概要

このサンプルは、Apache Kafka® でパイプラインをビルドする方法を示します。

../../../../_images/pipeline.jpg

Apache Kafka® トピックにデータを生成するさまざまな方法( Kafka Connect を使用する場合と使用しない場合)、また Kafka Streams API および ksqlDB 向けにサンプルをシリアル化する各種方法も紹介します。

サンプル Kafka のトピックへの生成 キー Value ストリーム処理
Confluent CLI プロデューサーと文字列 CLI String String Kafka Streams
JDBC Source Connector と JSON JDBC、SMT でキーを追加 Long Json Kafka Streams
JDBC Source Connector と SpecificAvro JDBC、SMT で名前空間を設定 null SpecificAvro Kafka Streams
JDBC Source Connector と GenericAvro JDBC null GenericAvro Kafka Streams
Java プロデューサーと SpecificAvro プロデューサー Long SpecificAvro Kafka Streams
JDBC Source Connector と Avro JDBC Long Avro ksqlDB

ホワイトペーパー『Kafka Serialization and Deserialization (SerDes) Examples』およびブログの投稿「Building a Real-Time Streaming ETL Pipeline in 20 Minutes」に、このサンプルの詳細説明があります。

Confluent Cloud

このサンプルで説明されている概念は、Confluent Cloud にも適用できます。Confluent Cloud では、セルフマネージド型ではなく、フルマネージド型のコネクターを使用することもでき、100% クラウドで実行することができます。試してみるには、Confluent Cloud インスタンスを作成し(新しい環境を作成する簡単な方法については、「Confluent Cloud 向け ccloud-stack ユーティリティ」を参照してください)、コネクターをデプロイして、アプリケーションから Confluent Cloud を参照します。

Confluent Cloud Console の Billing & payment セクションでプロモーションコード C50INTEG を入力すると、Confluent Cloud で $50 相当を無料で使用できます(詳細)。このプロモーションコードで、この Confluent Cloud サンプルの 1 日分の実行費用が補填されます。これを超えてサービスを利用すると、このサンプルで作成した Confluent Cloud リソースを破棄するまで、時間単位で課金されることがあります。

データの説明

元のデータは、次に示すような 地名のテーブル です。

id|name|sale

1|Raleigh|300
2|Dusseldorf|100
1|Raleigh|600
3|Moscow|800
4|Sydney|200
2|Dusseldorf|400
5|Chennai|400
3|Moscow|100
3|Moscow|200
1|Raleigh|700

これにより、Kafka のトピックにレコードが生成されます。

../../../../_images/blog_stream.jpg

実際のクライアントアプリケーションは、メソッド count および sum を使用してこのデータを処理し、都市ごとにグループ化します。

count の出力は次のとおりです。

1|Raleigh|3
2|Dusseldorf|2
3|Moscow|3
4|Sydney|1
5|Chennai|1
../../../../_images/blog_count.jpg

sum の出力は次のとおりです。

1|Raleigh|1600
2|Dusseldorf|500
3|Moscow|1100
4|Sydney|200
5|Chennai|400
../../../../_images/blog_sum.jpg

前提条件

  • Confluent Platform のダウンロード
  • Java コードをコンパイルするための Maven コマンド mvn
  • timeout: bash スクリプトにより、一定の期間の後にコンシューマープロセスを終了するために使用されます。timeout は、ほとんどの Linux ディストリビューションで使用できますが、macOS では使用できません。macOS ユーザーの方は、「macOS の場合のインストール手順」を参照してください。

サンプルの実行

  1. サンプル GitHub リポジトリ のクローンを作成し、6.2.4-post ブランチをチェックアウトします。

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout 6.2.4-post
    
  2. connect-streams-pipeline のサンプルのディレクトリに変更します。

    cd connect-streams-pipeline
    
  3. サンプルをエンドツーエンドで実行します。

    ./start.sh
    
  4. Confluent Platform を実行している場合は、ブラウザーを開き、http://localhost:9021/management/connect で Confluent Control Center web interface Management -> Connect タブに移動して、Kafka のトピック内のデータとデプロイ済みコネクターを表示します。

サンプル 1: Kafka コンソールプロデューサー -> キー:String および値:String

  • コマンドライン confluent local services kafka produce で、String キーと String 値を Kafka のトピックに生成します。
  • クライアントアプリケーション は、キーと値の両方に Serdes.String() を使用して Kafka のトピックから読み取ります。
../../../../_images/example_1.jpg

サンプル 2: JDBC Source Connector と Single Message Transformation -> キー:Long および値:JSON

  • Kafka Connect JDBC Source Connector は JSON の値を生成し、Single Message Transformation(SMT)を使用してキーを挿入します。デフォルトでは JDBC Source Connector はキーを挿入しないため、これが便利です。
  • このサンプルは、キーを int64 にキャストするものなど、いくつかの SMT を使用します。キーは、KAFKA-6913 で提供されている org.apache.kafka.connect.converters.LongConverter を使用します。
##
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-sqlite-jdbc-autoincrement-jdbcjson
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1

# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:sqlite:/usr/local/lib/retail.db
mode=incrementing
incrementing.column.name=id
topic.prefix=jdbcjson-
table.whitelist=locations
transforms=InsertKey, ExtractId, CastLong
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=id
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=id
transforms.CastLong.type=org.apache.kafka.connect.transforms.Cast$Key
transforms.CastLong.spec=int64

key.converter=org.apache.kafka.connect.converters.LongConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
../../../../_images/example_2.jpg

サンプル 3: JDBC Source Connector と SpecificAvro -> キー:String(null) および値:SpecificAvro

  • Kafka Connect JDBC Source Connector は、Avro 値および null String キーを Kafka のトピックに生成します。
  • このサンプルでは、SetSchemaMetadata という Single Message Transformation(SMT)を、KAFKA-5164 向けの修正が含まれるコードで使用します。これにより、コネクターがスキーマに名前空間を設定できます。KAFKA-5164 向けの修正がない場合は、SpecificAvro の代わりに GenericAvro を使用するサンプル 4 を参照してください。
##
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-sqlite-jdbc-autoincrement-jdbcspecificavro
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1

# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:sqlite:/usr/local/lib/retail.db
mode=incrementing
incrementing.column.name=id
topic.prefix=jdbcspecificavro-
table.whitelist=locations

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schemas.enable=true

transforms=SetValueSchema
transforms.SetValueSchema.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.SetValueSchema.schema.name=io.confluent.examples.connectandstreams.avro.Location
  • クライアントアプリケーション は、SpecificAvroSerde を値として使用して Kafka のトピックから読み取り、次に map 関数を使用して、キーが Long、値がカスタムクラスになるようにメッセージのストリームを変換します。
../../../../_images/example_3.jpg

サンプル 4: JDBC Source Connector と GenericAvro -> キー:String(null) および値:GenericAvro

##
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-sqlite-jdbc-autoincrement-jdbcgenericavro
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1

# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:sqlite:/usr/local/lib/retail.db
mode=incrementing
incrementing.column.name=id
topic.prefix=jdbcgenericavro-
table.whitelist=locations

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schemas.enable=true
  • クライアントアプリケーション は、GenericAvroSerde を値として使用して Kafka のトピックから読み取り、次に map 関数を使用して、キーが Long、値がカスタムクラスになるようにメッセージのストリームを変換します。
  • このサンプルでは現在、特定の理由により、SpecificAvroSerde ではなく GenericAvroSerde を使用しています。JDBC Source Connector は現在、Kafka に生成しているデータのスキーマ名を生成するときに名前空間を設定しません。SpecificAvroSerde の場合、名前空間がないことは、読み取りスキーマと書き込みスキーマを照合しているときに問題となります。これは、Avro では、書き込みスキーマの名前と名前空間を使用してクラス名を作成し、このクラスを読み込もうとしますが、名前空間がないとクラスが見つからないためです。
../../../../_images/example_3.jpg

サンプル 5: Java クライアントプロデューサーと SpecificAvro -> キー:Long および値:SpecificAvro

../../../../_images/example_5.jpg

サンプル 6: JDBC Source Connector と Avro から ksqlDB へ -> キー:Long および値:Avro

##
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-sqlite-jdbc-autoincrement-jdbcavroksql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1

# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:sqlite:/usr/local/lib/retail.db
mode=incrementing
incrementing.column.name=id
topic.prefix=jdbcavroksql-
table.whitelist=locations

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schemas.enable=true
  • ksqlDB は、Kafka のトピックから読み取り、次に PARTITION BY を使用して BIGINT キーでメッセージのストリームを新規作成します。
../../../../_images/example_6.jpg

技術的注意事項

  • KAFKA-5245: (1) StreamsBuilder#stream() を呼び出すときと (2) KStream#groupByKey() を呼び出すときに、2 回 Serdes を指定する必要がある
  • PR-531: Confluent ディストリビューションで、GenericAvroSerde および SpecificAvroSerde 用のパッケージを提供
  • KAFKA-2378: Kafka Connect をクライアントアプリケーションに組み込むことができる API を追加
  • KAFKA-2526: confluent local services kafka produce--key-serializer 引数を使用してキーを Long としてシリアル化できない。その結果、このサンプルでは、キーが String としてシリアル化されます。回避策として、独自の kafka.common.MessageReader(LineMessageReader のデフォルト実装をチェックアウトするなど)を作成した後に、confluent local services kafka produce--line-reader 引数を指定できます。
  • KAFKA-5164: コネクターがスキーマに名前空間を設定できるようにする