Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Connecting Kafka Connect to Confluent Cloud¶
You can configure a local Connect cluster backed by a source Kafka cluster in Confluent Cloud.
This topic shows how to create a Connect cluster which will sink records from a connect-test
topic in Confluent Cloud
cluster into a local file.
- Prerequisites
- Access to Confluent Cloud
- Confluent Cloud CLI installed
- curl
- jq
Create the Topics in Cloud Cluster¶
Create a
connect-test
topic as follows:$ ccloud topic create --partitions 1 connect-test
Produce records using Confluent Cloud (press
Ctrl+D
to stop producing).$ ccloud produce --topic connect-test {"field1": "hello", "field2": 1} {"field1": "hello", "field2": 2} {"field1": "hello", "field2": 3} {"field1": "hello", "field2": 4} {"field1": "hello", "field2": 5} {"field1": "hello", "field2": 6} ^D
Verify that they can be consumed:
$ ccloud consume -b --topic connect-test {"field1": "hello", "field2": 1} {"field1": "hello", "field2": 2} {"field1": "hello", "field2": 3} {"field1": "hello", "field2": 4} {"field1": "hello", "field2": 5} {"field1": "hello", "field2": 6} ^D
Setup a Connect Worker with Confluent Cloud¶
Download the latest ZIP or TAR distribution of Confluent Platform from https://www.confluent.io/download/. Follow the instructions based on whether you are using a Standalone Cluster or Distributed Cluster.
Replace <cloud-bootstrap-servers>
, <api-key>
, and <api-secret>
with appropriate values from your Kafka cluster setup.
Standalone Cluster¶
Create
my-connect-standalone.properties
in the config directory, whose contents look like the following (note the security configs withconsumer.*
andproducer.*
prefixes).$ cat etc/my-connect-standalone.properties bootstrap.servers=<cloud-bootstrap-servers> # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply # it to key.converter.schemas.enable=false value.converter.schemas.enable=false # The internal converter used for offsets and config data is configurable and must be specified, but most users will # always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format. internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Store offsets on local filesystem offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN request.timeout.ms=20000 retry.backoff.ms=500 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; security.protocol=SASL_SSL consumer.ssl.endpoint.identification.algorithm=https consumer.sasl.mechanism=PLAIN consumer.request.timeout.ms=20000 consumer.retry.backoff.ms=500 consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; consumer.security.protocol=SASL_SSL producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.request.timeout.ms=20000 producer.retry.backoff.ms=500 producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; producer.security.protocol=SASL_SSL
Create
my-file-sink.properties
in the config directory, whose contents look like the following (note the security configs withconsumer.*
prefix):$ cat ./etc/my-file-sink.properties name=my-file-sink connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector tasks.max=1 topics=connect-test file=my_file.txt
Run the
connect-standalone
script with the filenames as arguments:$ ./bin/connect-standalone ./etc/my-connect-standalone.properties ./etc/my-file-sink.properties
This should start a connect worker on your machine which will consume the records produced earlier using the
ccloud
command. If you tail the contents ofmy_file.txt
, it should resemble the following:$ tail -f my_file.txt {"field1": "hello", "field2": 1} {"field1": "hello", "field2": 2} {"field1": "hello", "field2": 3} {"field1": "hello", "field2": 4} {"field1": "hello", "field2": 5} {"field1": "hello", "field2": 6}
Distributed Cluster¶
Create
connect-distributed
in the config directory, whose contents look like the following (note the security configs withconsumer.*
andproducer.*
prefixes).$ cat etc/my-connect-distributed.properties bootstrap.servers=<cloud-bootstrap-servers> group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Connect clusters create three topics to manage offsets, configs, and status # information. Note that these contribute towards the total partition limit quota. offset.storage.topic=connect-offsets offset.storage.replication.factor=3 offset.storage.partitions=3 config.storage.topic=connect-configs config.storage.replication.factor=3 status.storage.topic=connect-status status.storage.replication.factor=3 offset.flush.interval.ms=10000 ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN request.timeout.ms=20000 retry.backoff.ms=500 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; security.protocol=SASL_SSL consumer.ssl.endpoint.identification.algorithm=https consumer.sasl.mechanism=PLAIN consumer.request.timeout.ms=20000 consumer.retry.backoff.ms=500 consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; consumer.security.protocol=SASL_SSL producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.request.timeout.ms=20000 producer.retry.backoff.ms=500 producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; producer.security.protocol=SASL_SSL
Run Connect using the following command:
$ ./bin/connect-distributed ./etc/my-connect-distributed.properties
To test if the workers came up correctly, you can setup another file sink as follows. Create a file
my-file-sink.json
whose contents are as follows:$ cat my-file-sink.json { "name": "my-file-sink", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": 3, "topics": "connect-test", "file": "my_file.txt" } }
Post this connector config to the worker using the curl command:
$ curl -s -H "Content-Type: application/json" -X POST -d @my-file-sink.json http://localhost:8083/connectors/ | jq .
This should give the following response:
{ "name": "my-file-sink", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "connect-test", "file": "my_file", "name": "my-file-sink" }, "tasks": [], "type": null }
Produce some records using Confluent Cloud and tail this file to check if the connectors were successfully created.