Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Connecting Kafka Connect to Confluent Cloud¶
You can configure a local Connect cluster backed by a source Kafka cluster in Confluent Cloud.
cluster into a local file.
- Prerequisites
- Access to Confluent Cloud
- Confluent Cloud CLI installed
- curl
- jq
Create the Topics in Cloud Cluster¶
Create a
page_visits
topic as follows:ccloud topic create --partitions 1 page_visits
Important
You must manually create topics for source connectors to write to.
Produce records to the
page_visits
topic.ccloud produce --topic page_visits
You can type messages in as standard input. By default they are newline separated. Press
Ctrl + C
to exit.foo bar baz ^C
Your output should resemble:
{"field1": "hello", "field2": 1} {"field1": "hello", "field2": 2} {"field1": "hello", "field2": 3} {"field1": "hello", "field2": 4} {"field1": "hello", "field2": 5} {"field1": "hello", "field2": 6} ^D
Verify that they can be consumed:
ccloud consume -b --topic page_visits
Your output should resemble:
{"field1": "hello", "field2": 1} {"field1": "hello", "field2": 2} {"field1": "hello", "field2": 3} {"field1": "hello", "field2": 4} {"field1": "hello", "field2": 5} {"field1": "hello", "field2": 6} ^D
Setup a Connect Worker with Confluent Cloud¶
Download the latest ZIP or TAR distribution of Confluent Platform from https://www.confluent.io/download/. Follow the instructions based on whether you are using a Standalone Cluster or Distributed Cluster.
Replace <cloud-bootstrap-servers>
, <api-key>
, and <api-secret>
with appropriate values from your Kafka cluster setup.
Standalone Cluster¶
Create
my-connect-standalone.properties
in the config directory, whose contents look like the following (note the security configs withconsumer.*
andproducer.*
prefixes).cat etc/my-connect-standalone.properties bootstrap.servers=<cloud-bootstrap-servers> # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply # it to key.converter.schemas.enable=false value.converter.schemas.enable=false # The internal converter used for offsets and config data is configurable and must be specified, but most users will # always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format. internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Store offsets on local filesystem offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN request.timeout.ms=20000 retry.backoff.ms=500 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; security.protocol=SASL_SSL consumer.ssl.endpoint.identification.algorithm=https consumer.sasl.mechanism=PLAIN consumer.request.timeout.ms=20000 consumer.retry.backoff.ms=500 consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; consumer.security.protocol=SASL_SSL producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.request.timeout.ms=20000 producer.retry.backoff.ms=500 producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; producer.security.protocol=SASL_SSL
Create
my-file-sink.properties
in the config directory, whose contents look like the following (note the security configs withconsumer.*
prefix):cat ./etc/my-file-sink.properties name=my-file-sink connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector tasks.max=1 topics=page_visits file=my_file.txt
Run the
connect-standalone
script with the filenames as arguments:./bin/connect-standalone ./etc/my-connect-standalone.properties ./etc/my-file-sink.properties
This should start a connect worker on your machine which will consume the records produced earlier using the
ccloud
command. If you tail the contents ofmy_file.txt
, it should resemble the following:tail -f my_file.txt {"field1": "hello", "field2": 1} {"field1": "hello", "field2": 2} {"field1": "hello", "field2": 3} {"field1": "hello", "field2": 4} {"field1": "hello", "field2": 5} {"field1": "hello", "field2": 6}
Distributed Cluster¶
Create
connect-distributed
in the config directory, whose contents look like the following (note the security configs withconsumer.*
andproducer.*
prefixes).cat etc/my-connect-distributed.properties bootstrap.servers=<cloud-bootstrap-servers> group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Connect clusters create three topics to manage offsets, configs, and status # information. Note that these contribute towards the total partition limit quota. offset.storage.topic=connect-offsets offset.storage.replication.factor=3 offset.storage.partitions=3 config.storage.topic=connect-configs config.storage.replication.factor=3 status.storage.topic=connect-status status.storage.replication.factor=3 offset.flush.interval.ms=10000 ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN request.timeout.ms=20000 retry.backoff.ms=500 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; security.protocol=SASL_SSL consumer.ssl.endpoint.identification.algorithm=https consumer.sasl.mechanism=PLAIN consumer.request.timeout.ms=20000 consumer.retry.backoff.ms=500 consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; consumer.security.protocol=SASL_SSL producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.request.timeout.ms=20000 producer.retry.backoff.ms=500 producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<api-key>" password="<api-secret>"; producer.security.protocol=SASL_SSL
Run Connect using the following command:
./bin/connect-distributed ./etc/my-connect-distributed.properties
To test if the workers came up correctly, you can setup another file sink as follows. Create a file
my-file-sink.json
whose contents are as follows:cat my-file-sink.json { "name": "my-file-sink", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": 3, "topics": "page_visits", "file": "my_file.txt" } }
Post this connector config to the worker using the curl command:
curl -s -H "Content-Type: application/json" -X POST -d @my-file-sink.json http://localhost:8083/connectors/ | jq .
This should give the following response:
{ "name": "my-file-sink", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "page_visits", "file": "my_file", "name": "my-file-sink" }, "tasks": [], "type": null }
Produce some records using Confluent Cloud and tail this file to check if the connectors were successfully created.