Kafka Connect と Kafka Streams によるパイプライン処理¶
概要¶
このサンプルは、Apache Kafka® でパイプラインをビルドする方法を示します。

Apache Kafka® トピックにデータを生成するさまざまな方法( Kafka Connect を使用する場合と使用しない場合)、また Kafka Streams API および ksqlDB 向けにサンプルをシリアル化する各種方法も紹介します。
サンプル | Kafka のトピックへの生成 | キー | Value | ストリーム処理 |
---|---|---|---|---|
Confluent CLI プロデューサーと文字列 | CLI | String | String | Kafka Streams |
JDBC Source Connector と JSON | JDBC、SMT でキーを追加 | Long | Json | Kafka Streams |
JDBC Source Connector と SpecificAvro | JDBC、SMT で名前空間を設定 | null | SpecificAvro | Kafka Streams |
JDBC Source Connector と GenericAvro | JDBC | null | GenericAvro | Kafka Streams |
Java プロデューサーと SpecificAvro | プロデューサー | Long | SpecificAvro | Kafka Streams |
JDBC Source Connector と Avro | JDBC | Long | Avro | ksqlDB |
ホワイトペーパー『Kafka Serialization and Deserialization (SerDes) Examples』およびブログの投稿「Building a Real-Time Streaming ETL Pipeline in 20 Minutes」に、このサンプルの詳細説明があります。
Confluent Cloud¶
このサンプルで説明されている概念は、Confluent Cloud にも適用できます。Confluent Cloud では、セルフマネージド型ではなく、フルマネージド型のコネクターを使用することもでき、100% クラウドで実行することができます。試してみるには、Confluent Cloud インスタンスを作成し(新しい環境を作成する簡単な方法については、「Confluent Cloud 向け ccloud-stack ユーティリティ」を参照してください)、コネクターをデプロイして、アプリケーションから Confluent Cloud を参照します。
Confluent Cloud Console の Billing & payment セクションでプロモーションコード C50INTEG
を入力すると、Confluent Cloud で $50 相当を無料で使用できます(詳細)。このプロモーションコードで、この Confluent Cloud サンプルの 1 日分の実行費用が補填されます。これを超えてサービスを利用すると、このサンプルで作成した Confluent Cloud リソースを破棄するまで、時間単位で課金されることがあります。
データの説明¶
元のデータは、次に示すような 地名のテーブル です。
id|name|sale
1|Raleigh|300
2|Dusseldorf|100
1|Raleigh|600
3|Moscow|800
4|Sydney|200
2|Dusseldorf|400
5|Chennai|400
3|Moscow|100
3|Moscow|200
1|Raleigh|700
これにより、Kafka のトピックにレコードが生成されます。

実際のクライアントアプリケーションは、メソッド count
および sum
を使用してこのデータを処理し、都市ごとにグループ化します。
count
の出力は次のとおりです。
1|Raleigh|3
2|Dusseldorf|2
3|Moscow|3
4|Sydney|1
5|Chennai|1

sum
の出力は次のとおりです。
1|Raleigh|1600
2|Dusseldorf|500
3|Moscow|1100
4|Sydney|200
5|Chennai|400

前提条件¶
- Confluent Platform のダウンロード
- Java コードをコンパイルするための Maven コマンド
mvn
timeout
: bash スクリプトにより、一定の期間の後にコンシューマープロセスを終了するために使用されます。timeout
は、ほとんどの Linux ディストリビューションで使用できますが、macOS では使用できません。macOS ユーザーの方は、「macOS の場合のインストール手順」を参照してください。
サンプルの実行¶
サンプル GitHub リポジトリ のクローンを作成し、
7.1.1-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 7.1.1-post
connect-streams-pipeline のサンプルのディレクトリに変更します。
cd connect-streams-pipeline
サンプルをエンドツーエンドで実行します。
./start.sh
Confluent Platform を実行している場合は、ブラウザーを開き、http://localhost:9021/management/connect で Confluent Control Center web interface Management -> Connect タブに移動して、Kafka のトピック内のデータとデプロイ済みコネクターを表示します。
サンプル 1: Kafka コンソールプロデューサー -> キー:String および値:String¶
- コマンドライン
confluent local services kafka produce
で、String
キーとString
値を Kafka のトピックに生成します。 - クライアントアプリケーション は、キーと値の両方に
Serdes.String()
を使用して Kafka のトピックから読み取ります。

サンプル 2: JDBC Source Connector と Single Message Transformation -> キー:Long および値:JSON¶
- Kafka Connect JDBC Source Connector は JSON の値を生成し、Single Message Transformation(
SMT
)を使用してキーを挿入します。デフォルトでは JDBC Source Connector はキーを挿入しないため、これが便利です。 - このサンプルは、キーを
int64
にキャストするものなど、いくつかの SMT を使用します。キーは、KAFKA-6913 で提供されているorg.apache.kafka.connect.converters.LongConverter
を使用します。
##
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-sqlite-jdbc-autoincrement-jdbcjson
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:sqlite:/usr/local/lib/retail.db
mode=incrementing
incrementing.column.name=id
topic.prefix=jdbcjson-
table.whitelist=locations
transforms=InsertKey, ExtractId, CastLong
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=id
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=id
transforms.CastLong.type=org.apache.kafka.connect.transforms.Cast$Key
transforms.CastLong.spec=int64
key.converter=org.apache.kafka.connect.converters.LongConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
- クライアントアプリケーション は、
Serdes.Long()
をキーに、カスタム JSON Serde を値に使用して Kafka のトピックから読み取ります。

サンプル 3: JDBC Source Connector と SpecificAvro -> キー:String(null) および値:SpecificAvro¶
- Kafka Connect JDBC Source Connector は、Avro 値および null
String
キーを Kafka のトピックに生成します。 - このサンプルでは、
SetSchemaMetadata
という Single Message Transformation(SMT)を、KAFKA-5164 向けの修正が含まれるコードで使用します。これにより、コネクターがスキーマに名前空間を設定できます。KAFKA-5164 向けの修正がない場合は、SpecificAvro
の代わりにGenericAvro
を使用するサンプル 4 を参照してください。
##
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-sqlite-jdbc-autoincrement-jdbcspecificavro
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:sqlite:/usr/local/lib/retail.db
mode=incrementing
incrementing.column.name=id
topic.prefix=jdbcspecificavro-
table.whitelist=locations
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schemas.enable=true
transforms=SetValueSchema
transforms.SetValueSchema.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.SetValueSchema.schema.name=io.confluent.examples.connectandstreams.avro.Location
- クライアントアプリケーション は、
SpecificAvroSerde
を値として使用して Kafka のトピックから読み取り、次にmap
関数を使用して、キーがLong
、値がカスタムクラスになるようにメッセージのストリームを変換します。

サンプル 4: JDBC Source Connector と GenericAvro -> キー:String(null) および値:GenericAvro¶
- Kafka Connect JDBC Source Connector は、Avro 値および null
String
キーを Kafka のトピックに生成します。
##
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-sqlite-jdbc-autoincrement-jdbcgenericavro
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:sqlite:/usr/local/lib/retail.db
mode=incrementing
incrementing.column.name=id
topic.prefix=jdbcgenericavro-
table.whitelist=locations
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schemas.enable=true
- クライアントアプリケーション は、
GenericAvroSerde
を値として使用して Kafka のトピックから読み取り、次にmap
関数を使用して、キーがLong
、値がカスタムクラスになるようにメッセージのストリームを変換します。 - このサンプルでは現在、特定の理由により、
SpecificAvroSerde
ではなくGenericAvroSerde
を使用しています。JDBC Source Connector は現在、Kafka に生成しているデータのスキーマ名を生成するときに名前空間を設定しません。SpecificAvroSerde
の場合、名前空間がないことは、読み取りスキーマと書き込みスキーマを照合しているときに問題となります。これは、Avro では、書き込みスキーマの名前と名前空間を使用してクラス名を作成し、このクラスを読み込もうとしますが、名前空間がないとクラスが見つからないためです。

サンプル 5: Java クライアントプロデューサーと SpecificAvro -> キー:Long および値:SpecificAvro¶
- Java クライアント は、
Long
キーおよびSpecificAvro
値を Kafka のトピックに生成します。 - クライアントアプリケーション は、
Serdes.Long()
をキーに、SpecificAvroSerde
を値に使用して Kafka のトピックから読み取ります。

サンプル 6: JDBC Source Connector と Avro から ksqlDB へ -> キー:Long および値:Avro¶
- Kafka Connect JDBC Source Connector は、Avro 値および null キーを Kafka のトピックに生成します。
##
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-sqlite-jdbc-autoincrement-jdbcavroksql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:sqlite:/usr/local/lib/retail.db
mode=incrementing
incrementing.column.name=id
topic.prefix=jdbcavroksql-
table.whitelist=locations
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schemas.enable=true
- ksqlDB は、Kafka のトピックから読み取り、次に
PARTITION BY
を使用してBIGINT
キーでメッセージのストリームを新規作成します。

技術的注意事項¶
- KAFKA-5245: (1)
StreamsBuilder#stream()
を呼び出すときと (2)KStream#groupByKey()
を呼び出すときに、2 回 Serdes を指定する必要がある - PR-531: Confluent ディストリビューションで、
GenericAvroSerde
およびSpecificAvroSerde
用のパッケージを提供 - KAFKA-2378: Kafka Connect をクライアントアプリケーションに組み込むことができる API を追加
- KAFKA-2526:
confluent local services kafka produce
で--key-serializer
引数を使用してキーをLong
としてシリアル化できない。その結果、このサンプルでは、キーがString
としてシリアル化されます。回避策として、独自の kafka.common.MessageReader(LineMessageReader のデフォルト実装をチェックアウトするなど)を作成した後に、confluent local services kafka produce
に--line-reader
引数を指定できます。 - KAFKA-5164: コネクターがスキーマに名前空間を設定できるようにする