Kafka Connect Quick Start¶
This quick start guide is provides a hands-on look at how you can move data into and out of Kafka without writing a single line of code. It is helpful to review the concepts for Kafka Connect in tandem with running the steps in this guide to gain a deeper understanding. At the end of this quick start you will be able to:
- Start a connect worker in standalone mode
- Read data from a file and publish to a Kafka topic
- Read data from a Kafka topic and publish to file
- Integrate the Schema Registry with a connector
What we will do¶
To demonstrate the basic functionality of Kafka Connect and its integration with the Confluent Schema Registry, a few local standalone Kafka Connect processes with connectors are run. You can insert data written to a file into Kafka and write data from a Kafka topic to the console. If you are using JSON as the Connect data format, see the instructions here for a tutorial that does not include the Schema Registry.
Start ZooKeeper, Kafka, Schema Registry¶
First, startup a ZooKeeper server. In this guide, we are assuming services will run on
# Start ZooKeeper. Run this command in its own terminal. $ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
Next, startup a Kafka broker that will serve as our single node Kafka cluster.
# Start Kafka. Run this command in its own terminal. $ ./bin/kafka-server-start ./etc/kafka/server.properties
Now startup the Schema Registry. We will use the Schema Registry to enable us to use Avro data more easily.
# Start Schema Registry. Run this command in its own terminal. $ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
For complete details on getting these services up and running see the Confluent Platform quick start.
Read File Data with Connect¶
We require two configuration files to startup a FileStreamSourceConnector to read data from a file and output it to Kafka.
This tutorial will run the Connect worker in standalone mode so it will act as a lone agent. Details on standalone versus
distributed mode for Kafka Connect workers can be found in the concepts section.
Because we will run in standalone mode, we need a worker configuration file and a connector configuration file. First, the
worker configuration file which is located at
./etc/schema-registry/connect-avro-standalone.properties. Below is an explanation
of the contents:
# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and # integrates the the Schema Registry. This sample configuration assumes a local installation of # Confluent Platform with all services running on their default ports. # Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated. bootstrap.servers=localhost:9092 # The converters specify the format of data in Kafka and how to translate it into Connect data. # Every Connect user will need to configure these based on the format they want their data in # when loaded from or stored into Kafka key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 # The internal converter used for offsets and config data is configurable and must be specified, # but most users will always want to use the built-in default. Offset and config data is never # visible outside of Connect in this format. internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # Local storage file for offset data offset.storage.file.filename=/tmp/connect.offsets
If choosing to use this tutorial without the Schema Registry, you would need to specify
value.converter to use
org.apache.kafka.connect.json.JsonConverter and remove the schema registry references.
Note that the values for
internal.value.converter should be left alone in most cases.
The second file is the connector configuration file specific to the
FileStreamSourceConnector. This file is located
./etc/kafka/connect-file-source.properties and the contents with an explanation are below:
# User defined connector instance name name=local-file-source # The class implementing the connector connector.class=FileStreamSource # Maximum number of tasks to run for this connector instance tasks.max=1 # The input file (path relative to worker's working directory) # This is the only setting specific to the FileStreamSource file=test.txt # The output topic in Kafka topic=connect-test
Now let’s seed the file with some sample data. Note that the connector configuration specifies a relative path for the file, so you should create the file in the same directory that you will run the Kafka Connect worker from.
$ echo -e "log line 1\nlog line 2" > test.txt
Next, start a Kafka Connect instance in standalone mode running this connector. For standalone mode, we can specify the connector configurations directly on the command line:
$ ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties \ ./etc/kafka/connect-file-source.properties
Each of the two lines in our log file should be delivered to Kafka, having registered a schema with the Schema Registry. One way to validate that the data is there is to use the console consumer in another console to inspect the contents of the topic:
$ ./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic connect-test --from-beginning "log line 1" "log line 2"
Note that we use the
kafka-avro-console-consumer because the data has been stored in Kafka in with an Avro format. This
consumer uses the Avro converter that is bundled with the Schema Registry in order to properly lookup the schema for the
After you are done experimenting with reading from a file with Connect, hit Ctrl-C to gracefully stop the Connect worker.
Write File Data with Connect¶
Now that we have written some data to a Kafka topic with Connect, let’s consume that data with a downstream process.
In this section, we will have a sink connector running in the worker in addition to the source that we ran in the last
section. The sink will write messages to a local file. Below is the connector
configuration with an explanation for the
FileStreamSink to write data to a file. This file is located at
# User defined name for the connector instance name=local-file-sink # Name of the connector class to be run connector.class=FileStreamSink # Max number of tasks to spawn for this connector instance tasks.max=1 # Output file name relative to worker's current working directory # This is the only property specific to the FileStreamSink connector file=test.sink.txt # Comma separate input topic list topics=connect-test
Note that the configuration contains similar settings to the file source. A key difference is that multiple input
topics are specified with
topics whereas the file source allows for only one output topic specified with
Now start the Kafka Connect standalone process, but this time specify both connector configurations. They will run in the same process, but each will have its own dedicated thread.
$ ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties \ ./etc/kafka/connect-file-source.properties ./etc/kafka/connect-file-sink.properties ... [ start up logs clipped ] ... "log line 1" "log line 2"
Once the process is up and running, you should see the two log lines written to the file
test.sink.txt as the sink connector
consumes them. Note that the messages were not written again by the source connector because it was able to resume from
the same point in the file where it left off when we shut down the previous process.
With both connectors running, we can see data flowing end-to-end in real time. Use another terminal to tail the output file and one more to add more lines to the text file:
$ tail -f test.sink.txt
$ echo -e "log line 3\nlog line 4" >> test.txt
You should see the additional lines output to
test.sink.txt. The new data was picked up by the
source connector, written to Kafka, read by the sink connector from Kafka, and finally output to the file.
Both source and sink connectors can track offsets, so you can start and stop the process any number of times and add more data to the input file and both will resume where they previously left off.
The connectors demonstrated in this quick start are intentionally simple so no additional dependencies are necessary. Most connectors will require a bit more configuration to specify how to connect to the source or sink system and what data to copy, and for many you will want to execute on a Kafka Connect cluster for scalability and fault tolerance. To get started with you’ll want to see the user guide for more details on running and managing Kafka Connect, including how to run in distributed mode. The Connectors section includes details on configuring and deploying the connectors that ship with Confluent Platform.
The easiest way to create, configure, and manage connectors is with Confluent Control Center. To learn more about Control Center, see Confluent Control Center.