.. _connect_quickstart: Tutorial: Moving Data In and Out of |ak| ========================================= This tutorial provides a hands-on look at how you can move data into and out of |ak-tm| without writing a single line of code. It is helpful to review the :ref:`concepts ` for |kconnect-long| in tandem with running the steps in this guide to gain a deeper understanding. At the end of this tutorial you will be able to: * Use Confluent CLI to manage Confluent services, including starting a single connect worker in distributed mode and loading and unloading connectors. * Read data from a file and publish to a |ak| topic. * Read data from a |ak| topic and publish to file. * Integrate |sr| with a connector. To demonstrate the basic functionality of |kconnect-long| and its integration with the |sr-long|, a few local standalone |kconnect-long| processes with connectors are run. You can insert data written to a file into |ak| and write data from a |ak| topic to the console. If you are using JSON as the |kconnect| data format, see the instructions `here `_ for a tutorial that does not include |sr|. .. include:: ../includes/installation-types-cli.rst Start the services ------------------ In this guide, we are assuming services will run on ``localhost`` with default properties. .. tip:: If not already in your PATH, add Confluent's ``bin`` directory by running: ``export PATH=/bin:$PATH`` Now that Confluent's ``bin`` directory is now included in your ``PATH`` variable, you can start all the services with the Confluent CLI: .. codewithvars:: bash |confluent_start| .. include:: ../includes/cli.rst :start-after: cli_limitations_start :end-before: cli_limitations_end Every service will start in order, printing a message with its status: .. include:: ../includes/cli.rst :start-after: CE_CLI_startup_output :end-before: COS_CP_CLI_startup_output You may choose to open |kconnect|'s log to make sure the service has started successfully: .. codewithvars:: bash |confluent_log| connect If an error occurred while starting services with Confluent CLI, you may access the logs of each service in one place by navigating to the directory where these logs are stored. For example: .. codewithvars:: bash # Show the log directory |confluent_current| /tmp/confluent.w1CpYsaI # Navigate to the log directory cd /tmp/confluent.w1CpYsaI # View the log less connect/connect.stderr For complete details on getting these services up and running see the |cp| :ref:`installation documentation `. Read File Data with |kconnect| ------------------------------ To startup a FileStreamSourceConnector that reads structured data from a file and exports the data into |ak|, using |sr| to inform |kconnect| of their structure, we will use one of the supported connector configurations that come pre-defined with Confluent CLI. To get the list of all the pre-defined connector configurations, run: .. codewithvars:: bash |confluent_list| connectors Bundled Predefined Connectors (edit configuration under etc/): elasticsearch-sink file-source file-sink jdbc-source jdbc-sink hdfs-sink s3-sink The pre-configured connector we will use first is called ``file-source`` and its configuration file is located at ``./etc/kafka/connect-file-source.properties``. Below is an explanation of the contents: .. codewithvars:: bash # User defined connector instance name. name=file-source # The class implementing the connector connector.class=FileStreamSource # Maximum number of tasks to run for this connector instance tasks.max=1 # The input file (path relative to worker's working directory) # This is the only setting specific to the FileStreamSource file=test.txt # The output topic in Kafka topic=connect-test If choosing to use this tutorial without |sr|, you must also specify the ``key.converter`` and ``value.converter`` properties to use ``org.apache.kafka.connect.json.JsonConverter``. This will override the converters' settings for this connector only. We are now ready to load the connector, but before we do that, let's seed the file with some sample data. Note that the connector configuration specifies a relative path for the file, so you should create the file in the same directory that you will run the |kconnect-long| worker from. .. codewithvars:: bash for i in {1..3}; do echo "log line $i"; done > test.txt Next, start an instance of the FileStreamSourceConnector using the configuration file you defined above. You can easily do this from the command line using the Confluent CLI as follows: .. codewithvars:: bash |confluent_load| file-source { "name": "file-source", "config": { "connector.class": "FileStreamSource", "tasks.max": "1", "file": "test.txt", "topics": "connect-test", "name": "file-source" }, "tasks": [] } Upon success it will print a snapshot of the connector's configuration. To confirm which connectors are loaded any time, run: .. codewithvars:: bash |confluent_status| connectors [ "file-source" ] You will get a list of all the loaded connectors in this worker. The same command supplied with the connector name will give you the status of this connector, including an indication of whether the connector has started successfully or has encountered a failure. For instance, running this command on the connector we just loaded would give us: .. codewithvars:: bash |confluent_status| file-source { "name": "file-source", "connector": { "state": "RUNNING", "worker_id": "192.168.10.1:8083" }, "tasks": [ { "state": "RUNNING", "id": 0, "worker_id": "192.168.10.1:8083" } ] } Soon after the connector starts, each of the three lines in our log file should be delivered to |ak|, having registered a schema with |sr|. One way to validate that the data is there is to use the console consumer in another console to inspect the contents of the topic: .. codewithvars:: bash kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning "log line 1" "log line 2" "log line 3" Note that we use the ``kafka-avro-console-consumer`` because the data has been stored in |ak| using Avro format. This consumer uses the Avro converter that is bundled with |sr| in order to properly lookup the schema for the Avro data. Write File Data with |kconnect| ------------------------------- Now that we have written some data to a |ak| topic with |kconnect|, let's consume that data with a downstream process. In this section, we will load a sink connector to the worker in addition to the source that we started in the last section. The sink will write messages to a local file. This connector is also pre-defined in Confluent CLI under the name ``file-sink``. Below is the connector's configuration as it is stored in ``etc/kafka/connect-file-sink.properties``: .. codewithvars:: bash # User defined name for the connector instance name=file-sink # Name of the connector class to be run connector.class=FileStreamSink # Max number of tasks to spawn for this connector instance tasks.max=1 # Output file name relative to worker's current working directory # This is the only property specific to the FileStreamSink connector file=test.sink.txt # Comma separate input topic list topics=connect-test Note that the configuration contains similar settings to the file source. A key difference is that multiple input topics are specified with ``topics`` whereas the file source allows for only one output topic specified with ``topic``. Now start the FileStreamSinkConnector. The sink connector will run within the same worker as the source connector, but each connector task will have its own dedicated thread. .. codewithvars:: bash |confluent_load| file-sink { "name": "file-sink", "config": { "connector.class": "FileStreamSink", "tasks.max": "1", "file": "test.sink.txt", "topics": "connect-test", "name": "file-sink" }, "tasks": [] } To make sure the sink connector is up and running, use Confluent CLI to get the state of this specific connector: .. codewithvars:: bash |confluent_status| file-sink { "name": "file-sink", "connector": { "state": "RUNNING", "worker_id": "192.168.10.1:8083" }, "tasks": [ { "state": "RUNNING", "id": 0, "worker_id": "192.168.10.1:8083" } ] } as well as the list of all loaded connectors: .. codewithvars:: bash |confluent_status| connectors [ "file-source", "file-sink" ] .. tip:: Because of the rebalancing that happens between worker tasks every time a connector is loaded, a call to :litwithvars:`|confluent_status| ` might not succeed immediately after a new connector is loaded. Once rebalancing completes, such calls will be able to return the actual status of a connector. By opening the file ``test.sink.txt`` you should see the two log lines written to it by the sink connector. Now, with both connectors running, we can see data flowing end-to-end in real time. To check this out, use another terminal to tail the output file: .. codewithvars:: bash tail -f test.sink.txt and in a different terminal start appending additional lines to the text file: .. codewithvars:: bash for i in {4..1000}; do echo "log line $i"; done >> test.txt You should see the lines being added to ``test.sink.txt``. The new data was picked up by the source connector, written to |ak|, read by the sink connector from |ak|, and finally appended to the file. .. codewithvars:: bash "log line 1" "log line 2" "log line 3" "log line 4" "log line 5" ... After you are done experimenting with reading from and writing to a file with |kconnect|, you have a few options with respect to shutting down the connectors: * Unload the connectors but leave the |kconnect| worker running. .. codewithvars:: bash |confluent_unload| file-source |confluent_unload| file-sink * Stop the |kconnect| worker altogether. .. codewithvars:: bash |confluent_stop| connect Stopping connect connect is [DOWN] * Stop the |kconnect| worker as well as all the rest Confluent services. .. codewithvars:: bash |confluent_stop| Your output should resemble: .. include:: ../includes/cli.rst :start-after: ce_cli_stop_output_start :end-before: ce_cli_stop_output_stop * Stop all the services and wipe out any data of this particular run of Confluent services. .. codewithvars:: bash |confluent_destroy| Your output should resemble: .. include:: ../includes/cli.rst :start-after: ce_cli_stop_destroy_output_start :end-before: ce_cli_stop_destroy_output_stop Both source and sink connectors can track offsets, so you can start and stop the process any number of times and add more data to the input file and both will resume where they previously left off. The connectors demonstrated in this tutorial are intentionally simple so no additional dependencies are necessary. Most connectors will require a bit more configuration to specify how to connect to the source or sink system and what data to copy, and for many you will want to execute on a |kconnect-long| cluster for scalability and fault tolerance. To get started with |kconnect-long| you'll want to see the :ref:`user guide ` for more details on running and managing |kconnect-long|, including how to run in distributed mode. The :ref:`Connectors ` section includes details on configuring and deploying the connectors that ship with |cp|. .. tip:: The easiest way to create, configure, and manage connectors is with |c3|. To learn more about Control Center, see :ref:`control_center`.