Connecting Kafka Connect to Confluent Cloud

You can configure a local Connect cluster backed by a source Kafka cluster in Confluent Cloud.

This topic shows how to create a Connect cluster which will sink records from a connect-test topic in Confluent Cloud cluster into a local file.

Prerequisites
  • Access to Confluent Cloud
  • Confluent Cloud CLI installed
  • curl
  • jq

Create the Topics in Cloud Cluster

  1. Create a connect-test topic as follows:

    ccloud topic create --partitions 1 connect-test
    

    Important

    You must manually create topics for source connectors to write to.

  2. Produce records using Confluent Cloud (press Ctrl+D to stop producing).

     ccloud produce --topic connect-test
    {"field1": "hello", "field2": 1}
    {"field1": "hello", "field2": 2}
    {"field1": "hello", "field2": 3}
    {"field1": "hello", "field2": 4}
    {"field1": "hello", "field2": 5}
    {"field1": "hello", "field2": 6}
    ^D
    
  3. Verify that they can be consumed:

     ccloud consume -b --topic connect-test
    {"field1": "hello", "field2": 1}
    {"field1": "hello", "field2": 2}
    {"field1": "hello", "field2": 3}
    {"field1": "hello", "field2": 4}
    {"field1": "hello", "field2": 5}
    {"field1": "hello", "field2": 6}
    ^D
    

Setup a Connect Worker with Confluent Cloud

Download the latest ZIP or TAR distribution of Confluent Platform from https://www.confluent.io/download/. Follow the instructions based on whether you are using a Standalone Cluster or Distributed Cluster.

Replace <cloud-bootstrap-servers>, <api-key>, and <api-secret> with appropriate values from your Kafka cluster setup.

Standalone Cluster

  1. Create my-connect-standalone.properties in the config directory, whose contents look like the following (note the security configs with consumer.* and producer.* prefixes).

     cat etc/my-connect-standalone.properties
    bootstrap.servers=<cloud-bootstrap-servers>
    
    # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
    # need to configure these based on the format they want their data in when loaded from or stored into Kafka
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply
    # it to
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    
    # The internal converter used for offsets and config data is configurable and must be specified, but most users will
    # always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    # Store offsets on local filesystem
    offset.storage.file.filename=/tmp/connect.offsets
    # Flush much faster than normal, which is useful for testing/debugging
    offset.flush.interval.ms=10000
    
    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    request.timeout.ms=20000
    retry.backoff.ms=500
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    security.protocol=SASL_SSL
    
    consumer.ssl.endpoint.identification.algorithm=https
    consumer.sasl.mechanism=PLAIN
    consumer.request.timeout.ms=20000
    consumer.retry.backoff.ms=500
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    consumer.security.protocol=SASL_SSL
    
    producer.ssl.endpoint.identification.algorithm=https
    producer.sasl.mechanism=PLAIN
    producer.request.timeout.ms=20000
    producer.retry.backoff.ms=500
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    producer.security.protocol=SASL_SSL
    
  2. Create my-file-sink.properties in the config directory, whose contents look like the following (note the security configs with consumer.* prefix):

     cat ./etc/my-file-sink.properties
    
    name=my-file-sink
    connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
    tasks.max=1
    topics=connect-test
    file=my_file.txt
    
  3. Run the connect-standalone script with the filenames as arguments:

    ./bin/connect-standalone  ./etc/my-connect-standalone.properties ./etc/my-file-sink.properties
    

    This should start a connect worker on your machine which will consume the records produced earlier using the ccloud command. If you tail the contents of my_file.txt, it should resemble the following:

     tail -f my_file.txt
    {"field1": "hello", "field2": 1}
    {"field1": "hello", "field2": 2}
    {"field1": "hello", "field2": 3}
    {"field1": "hello", "field2": 4}
    {"field1": "hello", "field2": 5}
    {"field1": "hello", "field2": 6}
    

Distributed Cluster

  1. Create connect-distributed in the config directory, whose contents look like the following (note the security configs with consumer.* and producer.* prefixes).

     cat etc/my-connect-distributed.properties
    bootstrap.servers=<cloud-bootstrap-servers>
    
    group.id=connect-cluster
    
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    # Connect clusters create three topics to manage offsets, configs, and status
    # information. Note that these contribute towards the total partition limit quota.
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=3
    offset.storage.partitions=3
    
    config.storage.topic=connect-configs
    config.storage.replication.factor=3
    
    status.storage.topic=connect-status
    status.storage.replication.factor=3
    
    offset.flush.interval.ms=10000
    
    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    request.timeout.ms=20000
    retry.backoff.ms=500
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    security.protocol=SASL_SSL
    
    consumer.ssl.endpoint.identification.algorithm=https
    consumer.sasl.mechanism=PLAIN
    consumer.request.timeout.ms=20000
    consumer.retry.backoff.ms=500
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    consumer.security.protocol=SASL_SSL
    
    producer.ssl.endpoint.identification.algorithm=https
    producer.sasl.mechanism=PLAIN
    producer.request.timeout.ms=20000
    producer.retry.backoff.ms=500
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    producer.security.protocol=SASL_SSL
    
  2. Run Connect using the following command:

    ./bin/connect-distributed ./etc/my-connect-distributed.properties
    

    To test if the workers came up correctly, you can setup another file sink as follows. Create a file my-file-sink.json whose contents are as follows:

     cat my-file-sink.json
    {
      "name": "my-file-sink",
      "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": 3,
        "topics": "connect-test",
        "file": "my_file.txt"
      }
    }
    
  3. Post this connector config to the worker using the curl command:

    curl -s -H "Content-Type: application/json" -X POST -d @my-file-sink.json http://localhost:8083/connectors/ | jq .
    

    This should give the following response:

    {
      "name": "my-file-sink",
      "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "connect-test",
        "file": "my_file",
        "name": "my-file-sink"
      },
      "tasks": [],
      "type": null
    }
    
  4. Produce some records using Confluent Cloud and tail this file to check if the connectors were successfully created.