.. _cloud-connect-config: Connecting |kconnect-long| to |ccloud| ====================================== You can configure a local Connect cluster backed by a source |ak-tm| cluster in |ccloud|. cluster into a local file. Prerequisites .. include:: includes/installation.rst :start-line: 2 :end-line: 4 - curl - jq Create the Topics in Cloud Cluster ---------------------------------- #. Create a ``page_visits`` topic as follows: .. code:: bash ccloud kafka topic create --partitions 1 page_visits .. important:: You must manually create topics for source connectors to write to. #. Produce records to the topic named ``page_visits``. Press ``Ctrl+C`` to exit. :: ccloud kafka topic produce page_visits Starting Kafka Producer. ^C to exit foo bar baz ^C Your output should resemble: :: {"field1": "hello", "field2": 1} {"field1": "hello", "field2": 2} {"field1": "hello", "field2": 3} {"field1": "hello", "field2": 4} {"field1": "hello", "field2": 5} {"field1": "hello", "field2": 6} ^D #. Verify that they can be consumed: :: ccloud kafka topic consume -b page_visits Your output should resemble: :: {"field1": "hello", "field2": 1} {"field1": "hello", "field2": 2} {"field1": "hello", "field2": 3} {"field1": "hello", "field2": 4} {"field1": "hello", "field2": 5} {"field1": "hello", "field2": 6} ^D Set up a Connect Worker with |ccloud| ------------------------------------- Download the latest ZIP or TAR distribution of |cp| from https://www.confluent.io/download/. Follow the instructions based on whether you are using a :ref:`connect-cloud-standalone` or :ref:`connect-cloud-distributed`. Replace ````, ````, and ```` with appropriate values from your |ak| cluster setup. .. _connect-cloud-standalone: ------------------ Standalone Cluster ------------------ #. Create ``my-connect-standalone.properties`` in the config directory, whose contents look like the following (note the security configs with ``consumer.*`` and ``producer.*`` prefixes). .. code:: bash cat etc/my-connect-standalone.properties bootstrap.servers= # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply # it to key.converter.schemas.enable=false value.converter.schemas.enable=false # The internal converter used for offsets and config data is configurable and must be specified, but most users will # always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format. internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Store offsets on local filesystem offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN request.timeout.ms=20000 retry.backoff.ms=500 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="" password=""; security.protocol=SASL_SSL consumer.ssl.endpoint.identification.algorithm=https consumer.sasl.mechanism=PLAIN consumer.request.timeout.ms=20000 consumer.retry.backoff.ms=500 consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="" password=""; consumer.security.protocol=SASL_SSL producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.request.timeout.ms=20000 producer.retry.backoff.ms=500 producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="" password=""; producer.security.protocol=SASL_SSL #. (Optional) Add the configs to ``my-connect-standalone.properties`` to connect to |ccloud| |sr| per the example in :devx-examples:`connect-ccloud.delta|ccloud/template_delta_configs/connect-ccloud.delta` on GitHub at :devx-examples:`ccloud/examples/template_delta_configs|ccloud/template_delta_configs`. .. code:: bash # Confluent Schema Registry for Kafka Connect value.converter=io.confluent.connect.avro.AvroConverter value.converter.basic.auth.credentials.source=USER_INFO value.converter.schema.registry.basic.auth.user.info=: value.converter.schema.registry.url=https:// #. Create ``my-file-sink.properties`` in the config directory, whose contents look like the following (note the security configs with ``consumer.*`` prefix): .. code:: bash cat ./etc/my-file-sink.properties name=my-file-sink connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector tasks.max=1 topics=page_visits file=my_file.txt .. important:: You must include the following configuration properties if you are using a self-managed connector that requires an enterprise license. :: confluent.topic.bootstrap.servers= confluent.topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \ required username="" password=""; confluent.topic.security.protocol=SASL_SSL confluent.topic.sasl.mechanism=PLAIN .. important:: You must include the following configuration properties if you are using a self-managed connector that uses Reporter to write response back to |ak| (for example, the :ref:`Azure Functions Sink ` connector or the :ref:`Google Cloud Functions Sink` connector) . :: reporter.admin.bootstrap.servers= reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \ required username="" password=""; reporter.admin.security.protocol=SASL_SSL reporter.admin.sasl.mechanism=PLAIN reporter.producer.bootstrap.servers= reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \ required username="" password=""; reporter.producer.security.protocol=SASL_SSL reporter.producer.sasl.mechanism=PLAIN .. important:: You must include the following configuration properties if you are using a Debezium CDC connector. :: database.history.kafka.bootstrap.servers= database.history.consumer.security.protocol=SASL_SSL database.history.consumer.ssl.endpoint.identification.algorithm=https database.history.consumer.sasl.mechanism=PLAIN database.history.consumer.sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required username="" password=""; database.history.producer.security.protocol=SASL_SSL database.history.producer.ssl.endpoint.identification.algorithm=https database.history.producer.sasl.mechanism=PLAIN database.history.producer.sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required username="" password=""; #. Run the ``connect-standalone`` script with the filenames as arguments: .. code:: bash ./bin/connect-standalone ./etc/my-connect-standalone.properties ./etc/my-file-sink.properties This should start a connect worker on your machine which will consume the records produced earlier using the ``ccloud`` command. If you tail the contents of ``my_file.txt``, it should resemble the following: :: tail -f my_file.txt {"field1": "hello", "field2": 1} {"field1": "hello", "field2": 2} {"field1": "hello", "field2": 3} {"field1": "hello", "field2": 4} {"field1": "hello", "field2": 5} {"field1": "hello", "field2": 6} .. _connect-cloud-distributed: ------------------- Distributed Cluster ------------------- #. Create ``connect-distributed`` in the config directory, whose contents look like the following (note the security configs with ``consumer.*`` and ``producer.*`` prefixes). .. code:: bash bootstrap.servers= group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Connect clusters create three topics to manage offsets, configs, and status # information. Note that these contribute towards the total partition limit quota. offset.storage.topic=connect-offsets offset.storage.replication.factor=3 offset.storage.partitions=3 config.storage.topic=connect-configs config.storage.replication.factor=3 status.storage.topic=connect-status status.storage.replication.factor=3 offset.flush.interval.ms=10000 ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN request.timeout.ms=20000 retry.backoff.ms=500 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="" password=""; security.protocol=SASL_SSL consumer.ssl.endpoint.identification.algorithm=https consumer.sasl.mechanism=PLAIN consumer.request.timeout.ms=20000 consumer.retry.backoff.ms=500 consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="" password=""; consumer.security.protocol=SASL_SSL producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.request.timeout.ms=20000 producer.retry.backoff.ms=500 producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="" password=""; producer.security.protocol=SASL_SSL #. (Optional) Add the configs to ``connect-distributed`` to connect to |ccloud| |sr| per the example in :devx-examples:`connect-ccloud.delta|ccloud/template_delta_configs/connect-ccloud.delta` on GitHub at :devx-examples:`ccloud/examples/template_delta_configs|ccloud/template_delta_configs`. .. code:: bash # Confluent Schema Registry for Kafka Connect value.converter=io.confluent.connect.avro.AvroConverter value.converter.basic.auth.credentials.source=USER_INFO value.converter.schema.registry.basic.auth.user.info=: value.converter.schema.registry.url=https:// #. Run Connect using the following command: .. highlight:: none :: ./bin/connect-distributed ./etc/my-connect-distributed.properties To test if the workers came up correctly, you can setup another file sink as follows. Create a file ``my-file-sink.json`` whose contents are as follows: :: cat my-file-sink.json { "name": "my-file-sink", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": 3, "topics": "page_visits", "file": "my_file.txt" } } .. important:: You must include the following configuration properties if you are using a self-managed connector that requires an enterprise license. :: "confluent.topic.bootstrap.servers":"", "confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";", "confluent.topic.security.protocol":"SASL_SSL", "confluent.topic.sasl.mechanism":"PLAIN" .. important:: You must include the following configuration properties if you are using a self-managed connector that uses Reporter to write response back to |ak| (for example, the :ref:`Azure Functions Sink ` connector or the :ref:`Google Cloud Functions Sink` connector) . :: "reporter.admin.bootstrap.servers":"", "reporter.admin.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";", "reporter.admin.security.protocol":"SASL_SSL", "reporter.admin.sasl.mechanism":"PLAIN", "reporter.producer.bootstrap.servers":"", "reporter.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";", "reporter.producer.security.protocol":"SASL_SSL", "reporter.producer.sasl.mechanism":"PLAIN" .. important:: You must include the following configuration properties if you are using a Debezium CDC connector. :: "database.history.kafka.bootstrap.servers": "", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.ssl.endpoint.identification.algorithm": "https", "database.history.consumer.sasl.mechanism": "PLAIN", "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.ssl.endpoint.identification.algorithm": "https", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";" #. Post this connector config to the worker using the curl command: :: curl -s -H "Content-Type: application/json" -X POST -d @my-file-sink.json http://localhost:8083/connectors/ | jq . This should give the following response: :: { "name": "my-file-sink", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "page_visits", "file": "my_file", "name": "my-file-sink" }, "tasks": [], "type": null } #. Produce some records using |ccloud| and tail this file to check if the connectors were successfully created. ------------------------ Connect to |ccloud| |sr| ------------------------ (Optional) To connect to |ccloud| |sr|, add the configs per the example in :devx-examples:`connect-ccloud.delta|ccloud/template_delta_configs/connect-ccloud.delta` on GitHub at :devx-examples:`ccloud/examples/template_delta_configs|ccloud/template_delta_configs`. :: # Confluent Schema Registry for Kafka Connect value.converter=io.confluent.connect.avro.AvroConverter value.converter.basic.auth.credentials.source=USER_INFO value.converter.schema.registry.basic.auth.user.info=: value.converter.schema.registry.url=https://