重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Kafka Connect を Confluent Cloud に接続する

Confluent Cloud でまだ提供されていないコネクターを実行するために、セルフマネージド型 Kafka Connect クラスターでコネクターを独自に実行することができます。このページでは、Confluent Cloud のソース Apache Kafka® クラスターを基盤にしたローカル Connect クラスターを構成する方法について説明します。

前提条件

ちなみに

簡単に始める方法

  • Confluent Cloud (https://confluent.cloud)で環境とクラスターを選択し、Tools and client configuration > CLI Tools に進み、そのまま利用できるクラスター構成ファイルとガイド付きのワークフローを取得し、Kafka コマンドを使用してローカルのクライアントとアプリケーションを Confluent Cloud に接続することができます。
  • この UI には、kafka-console-producerkafka-console-consumer のコマンドラインツールを使用してトピックのメッセージの送信と読み取りを行うことで構成をローカルでテストする方法が表示されます。
  • 最初から Connect クラスターをセットアップする方法を示す例も表示されます。

Cloud Cluster でトピックを作成する

ソースコネクターの書き込み先のトピックを手動で作成する必要があります。

  1. 以下のようにして page_visits トピックを作成します。

    confluent kafka topic create --partitions 1 page_visits
    

Confluent Platform のインストールを使用してローカルに Connect ワーカーをセットアップする

https://www.confluent.io/download/ から ZIP または TAR 形式の Confluent Platform の最新のディストリビューションをダウンロードします。スタンドアロンクラスター分散クラスター のどちらを使用するかに応じて以下の手順を実行します。

<cloud-bootstrap-servers><api-key><api-secret> は、使用している Kafka クラスターのセットアップの適切な値に置き換えてください。

重要

作業を開始する前に、「Kafka Connect の使用方法」を確認してください。ワーカー構成プロパティとコネクター構成プロパティを構成する方法について理解しておいてください。

スタンドアロンクラスター

  1. config ディレクトリに、次のような内容の my-connect-standalone.properties を作成します(consumer.* プレフィックスと producer.* プレフィックスが付いているセキュリティ構成に注目してください)。

    cat etc/my-connect-standalone.properties
    bootstrap.servers=<cloud-bootstrap-servers>
    
    # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
    # need to configure these based on the format they want their data in when loaded from or stored into Kafka
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply
    # it to
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    
    # The internal converter used for offsets and config data is configurable and must be specified, but most users will
    # always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    # Store offsets on local filesystem
    offset.storage.file.filename=/tmp/connect.offsets
    # Flush much faster than normal, which is useful for testing/debugging
    offset.flush.interval.ms=10000
    
    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    security.protocol=SASL_SSL
    
    consumer.ssl.endpoint.identification.algorithm=https
    consumer.sasl.mechanism=PLAIN
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    consumer.security.protocol=SASL_SSL
    
    producer.ssl.endpoint.identification.algorithm=https
    producer.sasl.mechanism=PLAIN
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    producer.security.protocol=SASL_SSL
    
    # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
    # (connectors, converters, transformations).
    plugin.path=/usr/share/java,/Users/<username>/confluent-6.2.1/share/confluent-hub-components
    
  2. (オプション)GitHub の ccloud/examples/template_delta_configs にある connect-ccloud.delta の例に沿って、Confluent Cloud スキーマレジストリ に接続するための構成を my-connect-standalone.properties に追加します。

    # Confluent Schema Registry for Kafka Connect
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.basic.auth.credentials.source=USER_INFO
    value.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
    value.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
    
  3. config ディレクトリに、次のような内容の my-file-sink.properties を作成します(consumer.* プレフィックスが付いているセキュリティ構成に注目してください)。

    cat ./etc/my-file-sink.properties
    name=my-file-sink
    connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
    tasks.max=1
    topics=page_visits
    file=my_file.txt
    

    重要

    エンタープライズライセンスが必要なセルフマネージド型コネクターを使用する場合は、コネクター構成に以下のプロパティを指定する必要があります。

    confluent.topic.bootstrap.servers=<cloud-bootstrap-servers>
    confluent.topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
    required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
    confluent.topic.security.protocol=SASL_SSL
    confluent.topic.sasl.mechanism=PLAIN
    

    重要

    Reporter を使用して応答を Kafka に書き込むセルフマネージド型コネクター(Azure Functions Sink Connector for Confluent PlatformGoogle Cloud Functions Sink Connector for Confluent Platform など)を使用する場合は、コネクター構成に以下のプロパティを指定する必要があります。

    reporter.admin.bootstrap.servers=<cloud-bootstrap-servers>
    reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
    required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
    reporter.admin.security.protocol=SASL_SSL
    reporter.admin.sasl.mechanism=PLAIN
    
    reporter.producer.bootstrap.servers=<cloud-bootstrap-servers>
    reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
    required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
    reporter.producer.security.protocol=SASL_SSL
    reporter.producer.sasl.mechanism=PLAIN
    

    重要

    Debezium CDC コネクターを使用する場合は、コネクター構成に以下のプロパティを指定する必要があります。

    database.history.kafka.bootstrap.servers=<cloud-bootstrap-servers>
    
    database.history.consumer.security.protocol=SASL_SSL
    database.history.consumer.ssl.endpoint.identification.algorithm=https
    database.history.consumer.sasl.mechanism=PLAIN
    database.history.consumer.sasl.jaas.config=
    org.apache.kafka.common.security.plain.PlainLoginModule
    required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
    
    database.history.producer.security.protocol=SASL_SSL
    database.history.producer.ssl.endpoint.identification.algorithm=https
    database.history.producer.sasl.mechanism=PLAIN
    database.history.producer.sasl.jaas.config=
    org.apache.kafka.common.security.plain.PlainLoginModule
    required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
    
  4. ファイル名を引数として指定して connect-standalone スクリプトを実行します。

    ./bin/connect-standalone  ./etc/my-connect-standalone.properties ./etc/my-file-sink.properties
    

    これによって、Connect ワーカーがマシンで起動し、ccloud コマンドで事前に生成されていたレコードを消費します。tail コマンドで my_file.txt の内容を表示すると、以下のようになります。

    tail -f my_file.txt
    {"field1": "hello", "field2": 1}
    {"field1": "hello", "field2": 2}
    {"field1": "hello", "field2": 3}
    {"field1": "hello", "field2": 4}
    {"field1": "hello", "field2": 5}
    {"field1": "hello", "field2": 6}
    

分散クラスター

  1. config ディレクトリに my-connect-distributed.properties という名前の分散プロパティファイルを作成します。この分散プロパティファイルの内容は以下の例のようになります。consumer.* プレフィックスと producer.* プレフィックスが付いたセキュリティプロパティに注目してください。

    bootstrap.servers=<cloud-bootstrap-servers>
    
    group.id=connect-cluster
    
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    # Connect clusters create three topics to manage offsets, configs, and status
    # information. Note that these contribute towards the total partition limit quota.
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=3
    offset.storage.partitions=3
    
    config.storage.topic=connect-configs
    config.storage.replication.factor=3
    
    status.storage.topic=connect-status
    status.storage.replication.factor=3
    
    offset.flush.interval.ms=10000
    
    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    security.protocol=SASL_SSL
    
    consumer.ssl.endpoint.identification.algorithm=https
    consumer.sasl.mechanism=PLAIN
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    consumer.security.protocol=SASL_SSL
    
    producer.ssl.endpoint.identification.algorithm=https
    producer.sasl.mechanism=PLAIN
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    producer.security.protocol=SASL_SSL
    
    # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
    # (connectors, converters, transformations).
    plugin.path=/usr/share/java,/Users/<username>/confluent-6.2.1/share/confluent-hub-components
    
  2. (オプション)以下の構成プロパティを my-connect-distributed.properties ファイルに追加します。これによって、Confluent Cloud スキーマレジストリ への接続が可能になります。例については、ccloud/examples/template_delta_configs にある connect-ccloud.delta を参照してください。

    # Confluent Schema Registry for Kafka Connect
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.basic.auth.credentials.source=USER_INFO
    value.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
    value.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
    
  3. 以下のコマンドを使用して Connect を実行します。

    ./bin/connect-distributed ./etc/my-connect-distributed.properties
    

    ワーカーが正常に起動したかどうかを確認するために、以下のようにして別のファイルシンクをセットアップします。以下のような内容の my-file-sink.json ファイルを作成してください。

    cat my-file-sink.json
    {
      "name": "my-file-sink",
      "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": 3,
        "topics": "page_visits",
        "file": "my_file.txt"
      }
    }
    

    重要

    エンタープライズライセンスが必要なセルフマネージド型コネクターを使用する場合は、コネクター構成に以下のプロパティを指定する必要があります。

    "confluent.topic.bootstrap.servers":"<cloud-bootstrap-servers>",
    "confluent.topic.sasl.jaas.config":
    "org.apache.kafka.common.security.plain.PlainLoginModule
    required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";",
    "confluent.topic.security.protocol":"SASL_SSL",
    "confluent.topic.sasl.mechanism":"PLAIN"
    

    重要

    Reporter を使用して応答を Kafka に書き込むセルフマネージド型コネクター(Azure Functions Sink Connector for Confluent PlatformGoogle Cloud Functions Sink Connector for Confluent Platform など)を使用する場合は、以下の構成プロパティを指定する必要があります。

    "reporter.admin.bootstrap.servers":"<cloud-bootstrap-servers>",
    "reporter.admin.sasl.jaas.config":
    "org.apache.kafka.common.security.plain.PlainLoginModule
    required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";",
    "reporter.admin.security.protocol":"SASL_SSL",
    "reporter.admin.sasl.mechanism":"PLAIN",
    
    "reporter.producer.bootstrap.servers":"<cloud-bootstrap-servers>",
    "reporter.producer.sasl.jaas.config":
    "org.apache.kafka.common.security.plain.PlainLoginModule
    required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";",
    "reporter.producer.security.protocol":"SASL_SSL",
    "reporter.producer.sasl.mechanism":"PLAIN"
    

    重要

    Debezium CDC コネクターを使用する場合は、コネクター構成に以下のプロパティを指定する必要があります。

    "database.history.kafka.bootstrap.servers": "<cloud-bootstrap-servers>",
    
    "database.history.consumer.security.protocol": "SASL_SSL",
    "database.history.consumer.ssl.endpoint.identification.algorithm": "https",
    "database.history.consumer.sasl.mechanism": "PLAIN",
    "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule
    required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";",
    
    "database.history.producer.security.protocol": "SASL_SSL",
    "database.history.producer.ssl.endpoint.identification.algorithm": "https",
    "database.history.producer.sasl.mechanism": "PLAIN",
    "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule
    required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";"
    
  4. curl コマンドを使用してこのコネクター構成をワーカーにポストします。

    curl -s -H "Content-Type: application/json" -X POST -d @my-file-sink.json http://localhost:8083/connectors/ | jq .
    

    以下の応答が返されます。

    {
      "name": "my-file-sink",
      "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "page_visits",
        "file": "my_file",
        "name": "my-file-sink"
      },
      "tasks": [],
      "type": null
    }
    
  5. Confluent Cloud を使用してレコードをいくつか生成し、tail コマンドでこのファイルの内容を表示して、コネクターが正常に作成されたことを確認します。

Confluent Cloud スキーマレジストリ への接続

(オプション)GitHub の ccloud/examples/template_delta_configs にある connect-ccloud.delta の例に沿って、Confluent Cloud スキーマレジストリ に接続するための構成を追加します。

# Confluent Schema Registry for Kafka Connect
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
value.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>

Docker を使用してローカルに Connect ワーカーをセットアップする

Docker で、Confluent Cloud のフルマネージドサービスと自己管理型コンポーネントを組み合わせて実行することができます。Connect などの Confluent Platform コンポーネントを Confluent Cloud に接続する Docker 環境については、「cp-all-in-one-cloud」を参照してください。ベースの Docker Connect イメージに用意されていない独自のコネクターを実行するには、そのコネクターの jar ファイルを Confluent Hub からインストールするように Docker イメージを変更する必要があります。手順については、「コネクターまたはソフトウェアの追加」を参照してください。

参考

デモやテストのために、Confluent Cloud ユーティリティを活用してください。完全な ccloud-stack を作成したり、Confluent Platform コンポーネントを Confluent Cloud に接続するために必要な構成を生成したりするユーティリティがあります。詳細については、「Confluent Cloud デモ」を参照してください。

その他のリソース