InfluxDB Source Connector for Confluent Platform
The Kafka Connect InfluxDB Source connector allows you to import data from an InfluxDB host into an Apache Kafka® topic.
Data is loaded by periodically executing an Influx query and creating an output
record for each row in the result set. By default, all measurements in a
database are copied, each to its own output topic. The database is monitored for
new measurements and adapts automatically. When copying data from a measurement,
the connector loads only new records.
The source connector supports copying measurements with a variety of InfluxDB
data types, adding measurements from the database dynamically, whitelists and
blacklists, varying polling intervals, and other settings. However, the most
important features for most users are the settings controlling how data is
incrementally copied from the database.
Kafka Connect tracks the latest record it retrieved from each measurement, so
it can start in the correct location on the next iteration (or in case of a
crash). The source connector uses this functionality to only get updated records
from measurements (or from the output of a custom query) on each iteration.
Several modes are supported, each of which differs in how modified rows are
In this quick start, you copy data from a single measurement from a local
Influx database running on Docker into a Kafka topic.
This example assumes you are running Kafka and Schema Registry locally on the default ports.
It also assumes your have Docker installed and running.
First, bring up the Influx database by running the following Docker command:
docker run -d -p 8086:8086 --name influxdb-local influxdb:1.7.7
This starts the Influx database, and maps it to port 8086 on
By default, the user name and password are blank. The database connection URL is
To create sample data in the Influx database, log in to the Docker container using the following command:
docker exec -it <containerid> bash
To find the container ID use the
docker ps command.
Once you are in the Docker container, log in to InfluxDB shell:
Your output should resemble:
Connected to http://localhost:8086 version 1.7.7
InfluxDB shell version: 1.7.7
Create Influx Database and Load Data
Create an Influx database with this command:
> create database testdb;
Check for the new database by running:
In the InfluxDB command prompt, create a measurement and seed it with some data:
> use testdb;
Using database testdb
> INSERT coin,id=1 value=100
You can run
SELECT * from coin; to verify your measurement data.
Start InfluxDB Source Connector
Start the Confluent Platform using the Confluent CLI command below.
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to
confluent local. For example, the syntax for
confluent start is now
confluent local services start. For more information, see confluent local.
confluent local services start
Next, create a configuration file for the connector. This file is included with
the connector in
and contains the following settings:
Load the InfluxDB source connector with this configuration.
confluent local services connect connector load influx-source-connector --config etc/kafka-connect-influxdb/influxdb-source-connector.properties
To check that it has copied the data that was present when you started Kafka
Connect, start a console consumer, reading from the beginning of the topic:
kafka-console-consumer --bootstrap-server localhost:9092 --topic influx_coin --from-beginning
Your output should resemble:
Note that the default polling interval is five seconds, so it may take a few
seconds to show up. Depending on your expected rate of updates or desired
latency, a smaller poll interval could be used to deliver updates more quickly.
All the features of Kafka Connect, including offset
management and fault tolerance, work with the source connector. You can restart
and kill the processes and they will pick up where they left off, copying only
new data (as defined by the
Each incremental query mode tracks a set of fields for each point, which it uses
to keep track of which records have been processed and which records are new.
mode setting controls this behavior and supports the following options:
- Timestamp: In this mode, a single record containing a timestamp is used
to track the last time data was processed and to query only for records that have been added since that time.
- Custom Query: The source connector supports using custom queries instead of copying whole
measurements. With a custom query, one of the other automatic update modes can be used as long
as the necessary
WHERE clause can be correctly appended to the query. Alternatively, the
specified query may handle filtering to new updates itself;
however, note that no offset tracking will be performed (unlike the automatic modes where
timestamp column values are recorded for each record), so the query
must track offsets itself.
- Bulk: This mode is unfiltered and therefore not incremental at all. It will load all records
from a measurement on each iteration. This can be useful if you want to periodically dump an entire
measurement where entries are eventually deleted and the downstream system can safely handle duplicates.
The source connector gives you quite a bit of flexibility regarding where you
can import data from and how that data is imported. The full set of
configuration options are listed in InfluxDB Source Connector Configuration Properties,
but here are a few template configurations that cover some common usage
Use a whitelist to limit changes to a subset of measurements in an Influx
timestamp fields that are standard on all whitelisted
measurements to detect records that have been created. This mode is the most
robust because it can use timestamps to guarantee modifications are not missed
even if the process dies in the middle of a query.
Use a blacklist to exclude measurements from copying data from an Influx
database. It will monitor all measurements except blacklisted measurements.
Use a custom query instead of loading measurements, which allows you to load data from multiple measurements.