重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
Kafka Connect を Confluent Cloud に接続する¶
Confluent Cloud でまだ提供されていないコネクターを実行するために、セルフマネージド型 Kafka Connect クラスターでコネクターを独自に実行することができます。このページでは、Confluent Cloud のソース Apache Kafka® クラスターを基盤にしたローカル Connect クラスターを構成する方法について説明します。
- 前提条件
- Confluent Cloud へのアクセス。
- Confluent CLI。
- curl
- jq
ちなみに
簡単に始める方法
- Confluent Cloud (https://confluent.cloud)で環境とクラスターを選択し、Tools and client configuration > CLI Tools に進み、そのまま利用できるクラスター構成ファイルとガイド付きのワークフローを取得し、Kafka コマンドを使用してローカルのクライアントとアプリケーションを Confluent Cloud に接続することができます。
- この UI には、
kafka-console-producer
とkafka-console-consumer
のコマンドラインツールを使用してトピックのメッセージの送信と読み取りを行うことで構成をローカルでテストする方法が表示されます。 - 最初から Connect クラスターをセットアップする方法を示す例も表示されます。
Cloud Cluster でトピックを作成する¶
ソースコネクターの書き込み先のトピックを手動で作成する必要があります。
以下のようにして
page_visits
トピックを作成します。confluent kafka topic create --partitions 1 page_visits
Confluent Platform のインストールを使用してローカルに Connect ワーカーをセットアップする¶
https://www.confluent.io/download/ から ZIP または TAR 形式の Confluent Platform の最新のディストリビューションをダウンロードします。スタンドアロンクラスター と 分散クラスター のどちらを使用するかに応じて以下の手順を実行します。
<cloud-bootstrap-servers>
、<api-key>
、<api-secret>
は、使用している Kafka クラスターのセットアップの適切な値に置き換えてください。
重要
作業を開始する前に、「Kafka Connect の使用方法」を確認してください。ワーカー構成プロパティとコネクター構成プロパティを構成する方法について理解しておいてください。
スタンドアロンクラスター¶
config ディレクトリに、次のような内容の
my-connect-standalone.properties
を作成します(consumer.*
プレフィックスとproducer.*
プレフィックスが付いているセキュリティ構成に注目してください)。cat etc/my-connect-standalone.properties bootstrap.servers=<cloud-bootstrap-servers> # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply # it to key.converter.schemas.enable=false value.converter.schemas.enable=false # The internal converter used for offsets and config data is configurable and must be specified, but most users will # always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format. internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Store offsets on local filesystem offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; security.protocol=SASL_SSL consumer.ssl.endpoint.identification.algorithm=https consumer.sasl.mechanism=PLAIN consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; consumer.security.protocol=SASL_SSL producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; producer.security.protocol=SASL_SSL # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). plugin.path=/usr/share/java,/Users/<username>/confluent-6.2.1/share/confluent-hub-components
(オプション)GitHub の ccloud/examples/template_delta_configs にある connect-ccloud.delta の例に沿って、Confluent Cloud スキーマレジストリ に接続するための構成を
my-connect-standalone.properties
に追加します。# Confluent Schema Registry for Kafka Connect value.converter=io.confluent.connect.avro.AvroConverter value.converter.basic.auth.credentials.source=USER_INFO value.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET> value.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
config ディレクトリに、次のような内容の
my-file-sink.properties
を作成します(consumer.*
プレフィックスが付いているセキュリティ構成に注目してください)。cat ./etc/my-file-sink.properties name=my-file-sink connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector tasks.max=1 topics=page_visits file=my_file.txt
重要
エンタープライズライセンスが必要なセルフマネージド型コネクターを使用する場合は、コネクター構成に以下のプロパティを指定する必要があります。
confluent.topic.bootstrap.servers=<cloud-bootstrap-servers> confluent.topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \ required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>"; confluent.topic.security.protocol=SASL_SSL confluent.topic.sasl.mechanism=PLAIN
重要
Reporter を使用して応答を Kafka に書き込むセルフマネージド型コネクター(Azure Functions Sink Connector for Confluent Platform や Google Cloud Functions Sink Connector for Confluent Platform など)を使用する場合は、コネクター構成に以下のプロパティを指定する必要があります。
reporter.admin.bootstrap.servers=<cloud-bootstrap-servers> reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \ required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>"; reporter.admin.security.protocol=SASL_SSL reporter.admin.sasl.mechanism=PLAIN reporter.producer.bootstrap.servers=<cloud-bootstrap-servers> reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \ required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>"; reporter.producer.security.protocol=SASL_SSL reporter.producer.sasl.mechanism=PLAIN
重要
Debezium CDC コネクターを使用する場合は、コネクター構成に以下のプロパティを指定する必要があります。
database.history.kafka.bootstrap.servers=<cloud-bootstrap-servers> database.history.consumer.security.protocol=SASL_SSL database.history.consumer.ssl.endpoint.identification.algorithm=https database.history.consumer.sasl.mechanism=PLAIN database.history.consumer.sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>"; database.history.producer.security.protocol=SASL_SSL database.history.producer.ssl.endpoint.identification.algorithm=https database.history.producer.sasl.mechanism=PLAIN database.history.producer.sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
ファイル名を引数として指定して
connect-standalone
スクリプトを実行します。./bin/connect-standalone ./etc/my-connect-standalone.properties ./etc/my-file-sink.properties
これによって、Connect ワーカーがマシンで起動し、
ccloud
コマンドで事前に生成されていたレコードを消費します。tail コマンドでmy_file.txt
の内容を表示すると、以下のようになります。tail -f my_file.txt {"field1": "hello", "field2": 1} {"field1": "hello", "field2": 2} {"field1": "hello", "field2": 3} {"field1": "hello", "field2": 4} {"field1": "hello", "field2": 5} {"field1": "hello", "field2": 6}
分散クラスター¶
config ディレクトリに
my-connect-distributed.properties
という名前の分散プロパティファイルを作成します。この分散プロパティファイルの内容は以下の例のようになります。consumer.*
プレフィックスとproducer.*
プレフィックスが付いたセキュリティプロパティに注目してください。bootstrap.servers=<cloud-bootstrap-servers> group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Connect clusters create three topics to manage offsets, configs, and status # information. Note that these contribute towards the total partition limit quota. offset.storage.topic=connect-offsets offset.storage.replication.factor=3 offset.storage.partitions=3 config.storage.topic=connect-configs config.storage.replication.factor=3 status.storage.topic=connect-status status.storage.replication.factor=3 offset.flush.interval.ms=10000 ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; security.protocol=SASL_SSL consumer.ssl.endpoint.identification.algorithm=https consumer.sasl.mechanism=PLAIN consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; consumer.security.protocol=SASL_SSL producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; producer.security.protocol=SASL_SSL # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). plugin.path=/usr/share/java,/Users/<username>/confluent-6.2.1/share/confluent-hub-components
(オプション)以下の構成プロパティを
my-connect-distributed.properties
ファイルに追加します。これによって、Confluent Cloud スキーマレジストリ への接続が可能になります。例については、ccloud/examples/template_delta_configs にある connect-ccloud.delta を参照してください。# Confluent Schema Registry for Kafka Connect value.converter=io.confluent.connect.avro.AvroConverter value.converter.basic.auth.credentials.source=USER_INFO value.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET> value.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
以下のコマンドを使用して Connect を実行します。
./bin/connect-distributed ./etc/my-connect-distributed.properties
ワーカーが正常に起動したかどうかを確認するために、以下のようにして別のファイルシンクをセットアップします。以下のような内容の
my-file-sink.json
ファイルを作成してください。cat my-file-sink.json { "name": "my-file-sink", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": 3, "topics": "page_visits", "file": "my_file.txt" } }
重要
エンタープライズライセンスが必要なセルフマネージド型コネクターを使用する場合は、コネクター構成に以下のプロパティを指定する必要があります。
"confluent.topic.bootstrap.servers":"<cloud-bootstrap-servers>", "confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";", "confluent.topic.security.protocol":"SASL_SSL", "confluent.topic.sasl.mechanism":"PLAIN"
重要
Reporter を使用して応答を Kafka に書き込むセルフマネージド型コネクター(Azure Functions Sink Connector for Confluent Platform や Google Cloud Functions Sink Connector for Confluent Platform など)を使用する場合は、以下の構成プロパティを指定する必要があります。
"reporter.admin.bootstrap.servers":"<cloud-bootstrap-servers>", "reporter.admin.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";", "reporter.admin.security.protocol":"SASL_SSL", "reporter.admin.sasl.mechanism":"PLAIN", "reporter.producer.bootstrap.servers":"<cloud-bootstrap-servers>", "reporter.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";", "reporter.producer.security.protocol":"SASL_SSL", "reporter.producer.sasl.mechanism":"PLAIN"
重要
Debezium CDC コネクターを使用する場合は、コネクター構成に以下のプロパティを指定する必要があります。
"database.history.kafka.bootstrap.servers": "<cloud-bootstrap-servers>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.ssl.endpoint.identification.algorithm": "https", "database.history.consumer.sasl.mechanism": "PLAIN", "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.ssl.endpoint.identification.algorithm": "https", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";"
curl コマンドを使用してこのコネクター構成をワーカーにポストします。
curl -s -H "Content-Type: application/json" -X POST -d @my-file-sink.json http://localhost:8083/connectors/ | jq .
以下の応答が返されます。
{ "name": "my-file-sink", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "page_visits", "file": "my_file", "name": "my-file-sink" }, "tasks": [], "type": null }
Confluent Cloud を使用してレコードをいくつか生成し、tail コマンドでこのファイルの内容を表示して、コネクターが正常に作成されたことを確認します。
Confluent Cloud スキーマレジストリ への接続¶
(オプション)GitHub の ccloud/examples/template_delta_configs にある connect-ccloud.delta の例に沿って、Confluent Cloud スキーマレジストリ に接続するための構成を追加します。
# Confluent Schema Registry for Kafka Connect
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
value.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
Docker を使用してローカルに Connect ワーカーをセットアップする¶
Docker で、Confluent Cloud のフルマネージドサービスと自己管理型コンポーネントを組み合わせて実行することができます。Connect などの Confluent Platform コンポーネントを Confluent Cloud に接続する Docker 環境については、「cp-all-in-one-cloud」を参照してください。ベースの Docker Connect イメージに用意されていない独自のコネクターを実行するには、そのコネクターの jar ファイルを Confluent Hub からインストールするように Docker イメージを変更する必要があります。手順については、「コネクターまたはソフトウェアの追加」を参照してください。
参考
デモやテストのために、Confluent Cloud ユーティリティを活用してください。完全な ccloud-stack
を作成したり、Confluent Platform コンポーネントを Confluent Cloud に接続するために必要な構成を生成したりするユーティリティがあります。詳細については、「Confluent Cloud デモ」を参照してください。
その他のリソース¶
- その他の Confluent Cloud デモについては、「Confluent Cloud デモ」を参照してください。
- Confluent Cloud のクイックスタートについては、「Quick Start for Confluent Cloud」を参照してください。
- Confluent Cloud CLI コマンドの詳細については、「Confluent CLI コマンドリファレンス」を参照してください。