Kafka Connect Quick Start

Goal

This quick start guide is provides a hands-on look at how you can move data into and out of Kafka without writing a single line of code. It is helpful to review the concepts for Kafka Connect in tandem with running the steps in this guide to gain a deeper understanding. At the end of this quick start you will be able to:

  • Start a connect worker in standalone mode
  • Read data from a file and publish to a Kafka topic
  • Read data from a Kafka topic and publish to file
  • Integrate the Schema Registry with a connector

What we will do

To demonstrate the basic functionality of Kafka Connect and its integration with the Confluent Schema Registry, a few local standalone Kafka Connect processes with connectors are run. You can insert data written to a file into Kafka and write data from a Kafka topic to the console. If you are using JSON as the Connect data format, see the instructions here for a tutorial that does not include the Schema Registry.

Start ZooKeeper, Kafka, Schema Registry

First, startup a ZooKeeper server. In this guide, we are assuming services will run on localhost.

# Start ZooKeeper.  Run this command in its own terminal.
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

Next, startup a Kafka broker that will serve as our single node Kafka cluster.

# Start Kafka.  Run this command in its own terminal.
$ ./bin/kafka-server-start ./etc/kafka/server.properties

Now startup the Schema Registry. We will use the Schema Registry to enable us to use Avro data more easily.

# Start Schema Registry. Run this command in its own terminal.
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

For complete details on getting these services up and running see the Confluent Platform quick start.

Read File Data with Connect

We require two configuration files to startup a FileStreamSourceConnector to read data from a file and output it to Kafka. This tutorial will run the Connect worker in standalone mode so it will act as a lone agent. Details on standalone versus distributed mode for Kafka Connect workers can be found in the concepts section. Because we will run in standalone mode, we need a worker configuration file and a connector configuration file. First, the worker configuration file which is located at ./etc/schema-registry/connect-avro-standalone.properties. Below is an explanation of the contents:

# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

If choosing to use this tutorial without the Schema Registry, you would need to specify key.converter and value.converter to use org.apache.kafka.connect.json.JsonConverter and remove the schema registry references. Note that the values for internal.key.converter and internal.value.converter should be left alone in most cases.

The second file is the connector configuration file specific to the FileStreamSourceConnector. This file is located at ./etc/kafka/connect-file-source.properties and the contents with an explanation are below:

# User defined connector instance name
name=local-file-source
# The class implementing the connector
connector.class=FileStreamSource
# Maximum number of tasks to run for this connector instance
tasks.max=1
# The input file (path relative to worker's working directory)
# This is the only setting specific to the FileStreamSource
file=test.txt
# The output topic in Kafka
topic=connect-test

Now let’s seed the file with some sample data. Note that the connector configuration specifies a relative path for the file, so you should create the file in the same directory that you will run the Kafka Connect worker from.

$ echo -e "log line 1\nlog line 2" > test.txt

Next, start a Kafka Connect instance in standalone mode running this connector. For standalone mode, we can specify the connector configurations directly on the command line:

$ ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties \
      ./etc/kafka/connect-file-source.properties

Each of the two lines in our log file should be delivered to Kafka, having registered a schema with the Schema Registry. One way to validate that the data is there is to use the console consumer in another console to inspect the contents of the topic:

$ ./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic connect-test --from-beginning
  "log line 1"
  "log line 2"

Note that we use the kafka-avro-console-consumer because the data has been stored in Kafka in with an Avro format. This consumer uses the Avro converter that is bundled with the Schema Registry in order to properly lookup the schema for the Avro data.

After you are done experimenting with reading from a file with Connect, hit Ctrl-C to gracefully stop the Connect worker.

Write File Data with Connect

Now that we have written some data to a Kafka topic with Connect, let’s consume that data with a downstream process. In this section, we will have a sink connector running in the worker in addition to the source that we ran in the last section. The sink will write messages to a local file. Below is the connector configuration with an explanation for the FileStreamSink to write data to a file. This file is located at ./etc/kafka/connect-file-sink.properties.

# User defined name for the connector instance
name=local-file-sink
# Name of the connector class to be run
connector.class=FileStreamSink
# Max number of tasks to spawn for this connector instance
tasks.max=1
# Output file name relative to worker's current working directory
# This is the only property specific to the FileStreamSink connector
file=test.sink.txt
# Comma separate input topic list
topics=connect-test

Note that the configuration contains similar settings to the file source. A key difference is that multiple input topics are specified with topics whereas the file source allows for only one output topic specified with topic.

Now start the Kafka Connect standalone process, but this time specify both connector configurations. They will run in the same process, but each will have its own dedicated thread.

$ ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties \
      ./etc/kafka/connect-file-source.properties ./etc/kafka/connect-file-sink.properties
  ... [ start up logs clipped ] ...
  "log line 1"
  "log line 2"

Once the process is up and running, you should see the two log lines written to the file test.sink.txt as the sink connector consumes them. Note that the messages were not written again by the source connector because it was able to resume from the same point in the file where it left off when we shut down the previous process.

With both connectors running, we can see data flowing end-to-end in real time. Use another terminal to tail the output file and one more to add more lines to the text file:

$ tail -f test.sink.txt
$ echo -e "log line 3\nlog line 4" >> test.txt

You should see the additional lines output to test.sink.txt. The new data was picked up by the source connector, written to Kafka, read by the sink connector from Kafka, and finally output to the file.

Both source and sink connectors can track offsets, so you can start and stop the process any number of times and add more data to the input file and both will resume where they previously left off.

The connectors demonstrated in this quick start are intentionally simple so no additional dependencies are necessary. Most connectors will require a bit more configuration to specify how to connect to the source or sink system and what data to copy, and for many you will want to execute on a Kafka Connect cluster for scalability and fault tolerance. To get started with you’ll want to see the user guide for more details on running and managing Kafka Connect, including how to run in distributed mode. The Connectors section includes details on configuring and deploying the connectors that ship with Confluent Platform.

Tip

The easiest way to create, configure, and manage connectors is with Confluent Control Center. To learn more about Control Center, see Confluent Control Center.