.. _cloud-connect-config: Connecting |kconnect-long| to |ccloud| ====================================== You can configure a local Connect cluster backed by a source Kafka cluster in |ccloud|. cluster into a local file. Prerequisites .. include:: includes/installation.rst :start-line: 2 :end-line: 4 - curl - jq Create the Topics in Cloud Cluster ---------------------------------- #. Create a ``page_visits`` topic as follows: .. code:: bash ccloud topic create --partitions 1 page_visits .. important:: You must manually create topics for source connectors to write to. #. .. include:: ../includes/ccloud-produce.rst Your output should resemble: :: {"field1": "hello", "field2": 1} {"field1": "hello", "field2": 2} {"field1": "hello", "field2": 3} {"field1": "hello", "field2": 4} {"field1": "hello", "field2": 5} {"field1": "hello", "field2": 6} ^D #. Verify that they can be consumed: :: ccloud consume -b --topic page_visits Your output should resemble: :: {"field1": "hello", "field2": 1} {"field1": "hello", "field2": 2} {"field1": "hello", "field2": 3} {"field1": "hello", "field2": 4} {"field1": "hello", "field2": 5} {"field1": "hello", "field2": 6} ^D Setup a Connect Worker with |ccloud| ------------------------------------ Download the latest ZIP or TAR distribution of |cp| from https://www.confluent.io/download/. Follow the instructions based on whether you are using a :ref:`connect-cloud-standalone` or :ref:`connect-cloud-distributed`. Replace ````, ````, and ```` with appropriate values from your Kafka cluster setup. .. _connect-cloud-standalone: ------------------ Standalone Cluster ------------------ #. Create ``my-connect-standalone.properties`` in the config directory, whose contents look like the following (note the security configs with ``consumer.*`` and ``producer.*`` prefixes). .. code:: bash cat etc/my-connect-standalone.properties bootstrap.servers= # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply # it to key.converter.schemas.enable=false value.converter.schemas.enable=false # The internal converter used for offsets and config data is configurable and must be specified, but most users will # always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format. internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Store offsets on local filesystem offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN request.timeout.ms=20000 retry.backoff.ms=500 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="" password=""; security.protocol=SASL_SSL consumer.ssl.endpoint.identification.algorithm=https consumer.sasl.mechanism=PLAIN consumer.request.timeout.ms=20000 consumer.retry.backoff.ms=500 consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="" password=""; consumer.security.protocol=SASL_SSL producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.request.timeout.ms=20000 producer.retry.backoff.ms=500 producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="" password=""; producer.security.protocol=SASL_SSL #. Create ``my-file-sink.properties`` in the config directory, whose contents look like the following (note the security configs with ``consumer.*`` prefix): .. code:: bash cat ./etc/my-file-sink.properties name=my-file-sink connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector tasks.max=1 topics=page_visits file=my_file.txt #. Run the ``connect-standalone`` script with the filenames as arguments: .. code:: bash ./bin/connect-standalone ./etc/my-connect-standalone.properties ./etc/my-file-sink.properties This should start a connect worker on your machine which will consume the records produced earlier using the ``ccloud`` command. If you tail the contents of ``my_file.txt``, it should resemble the following: .. code:: bash tail -f my_file.txt {"field1": "hello", "field2": 1} {"field1": "hello", "field2": 2} {"field1": "hello", "field2": 3} {"field1": "hello", "field2": 4} {"field1": "hello", "field2": 5} {"field1": "hello", "field2": 6} .. _connect-cloud-distributed: ------------------- Distributed Cluster ------------------- #. Create ``connect-distributed`` in the config directory, whose contents look like the following (note the security configs with ``consumer.*`` and ``producer.*`` prefixes). .. code:: bash cat etc/my-connect-distributed.properties bootstrap.servers= group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Connect clusters create three topics to manage offsets, configs, and status # information. Note that these contribute towards the total partition limit quota. offset.storage.topic=connect-offsets offset.storage.replication.factor=3 offset.storage.partitions=3 config.storage.topic=connect-configs config.storage.replication.factor=3 status.storage.topic=connect-status status.storage.replication.factor=3 offset.flush.interval.ms=10000 ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN request.timeout.ms=20000 retry.backoff.ms=500 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="" password=""; security.protocol=SASL_SSL consumer.ssl.endpoint.identification.algorithm=https consumer.sasl.mechanism=PLAIN consumer.request.timeout.ms=20000 consumer.retry.backoff.ms=500 consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="" password=""; consumer.security.protocol=SASL_SSL producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.request.timeout.ms=20000 producer.retry.backoff.ms=500 producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="" password=""; producer.security.protocol=SASL_SSL #. Run Connect using the following command: .. code:: bash ./bin/connect-distributed ./etc/my-connect-distributed.properties To test if the workers came up correctly, you can setup another file sink as follows. Create a file ``my-file-sink.json`` whose contents are as follows: .. code:: bash cat my-file-sink.json { "name": "my-file-sink", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": 3, "topics": "page_visits", "file": "my_file.txt" } } #. Post this connector config to the worker using the curl command: .. code:: bash curl -s -H "Content-Type: application/json" -X POST -d @my-file-sink.json http://localhost:8083/connectors/ | jq . This should give the following response: .. code:: bash { "name": "my-file-sink", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "page_visits", "file": "my_file", "name": "my-file-sink" }, "tasks": [], "type": null } #. Produce some records using |ccloud| and tail this file to check if the connectors were successfully created.