InfluxDB Source Connector¶
The InfluxDB source connector allows you to import data from an InfluxDB host into Kafka topics.
Data is loaded by periodically executing an Influx query and creating an output record for each row in the result set. By default, all measurements in a database are copied, each to its own output topic. The database is monitored for new measurements and adapts automatically. When copying data from a measurement, the connector loads only new records.
The source connector supports copying measurements with a variety of InfluxDB data types, adding measurements from the database dynamically, whitelists and blacklists, varying polling intervals, and other settings. However, the most important features for most users are the settings controlling how data is incrementally copied from the database.
Kafka Connect tracks the latest record it retrieved from each measurement, so it can start in the correct location on the next iteration (or in case of a crash). The source connector uses this functionality to only get updated records from measurements (or from the output of a custom query) on each iteration. Several modes are supported, each of which differs in how modified rows are detected.
In this quick start, you copy data from a single measurement from a local Influx database running on Docker into a Kafka topic.
This example assumes you are running Kafka and Schema Registry locally on the default ports. It also assumes your have Docker installed and running.
First, bring up the Influx database by running the following Docker command:
docker run -d -p 8086:8086 --name influxdb-local influxdb:1.7.7
This starts the Influx database, and maps it to port 8086 on
By default, the user name and password are blank. The database connection URL is
To create sample data in the Influx database, log in to the Docker container using the following command:
docker exec -it <containerid> bash
To find the container ID use the
docker ps command.
Once you are in the Docker container, log in to InfluxDB shell:
Your output should resemble:
Connected to http://localhost:8086 version 1.7.7 InfluxDB shell version: 1.7.7
Create Influx Database and Load Data¶
Create an Influx database with this command:
> create database testdb;
Check for the new database by running:
> show databases;
In the InfluxDB command prompt, create a measurement and seed it with some data:
> use testdb; Using database testdb > INSERT coin,id=1 value=100
You can run
SELECT * from coin;to verify your measurement data.
Start InfluxDB Source Connector¶
Start the Confluent Platform using the Confluent CLI command below.
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to
confluent local. For example, the syntax for
confluent start is now
confluent local start. For more information, see confluent local.
confluent local start
Next, create a configuration file for the connector. This file is included with
the connector in
and contains the following settings:
name=InfluxDBSourceConnector connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector tasks.max=1 topic.prefix=influx_ influxdb.url=http://localhost:8086 influxdb.db=testdb mode=timestamp value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false
Load the InfluxDB source connector with this configuration.
confluent local load influx-source-connector -- -d etc/kafka-connect-influxdb/influxdb-source-connector.properties
To check that it has copied the data that was present when you started Kafka Connect, start a console consumer, reading from the beginning of the topic:
kafka-console-consumer --bootstrap-server localhost:9092 --topic influx_coin --from-beginning
Your output should resemble:
Note that the default polling interval is five seconds, so it may take a few seconds to show up. Depending on your expected rate of updates or desired latency, a smaller poll interval could be used to deliver updates more quickly.
All the features of Kafka Connect, including offset
management and fault tolerance, work with the source connector. You can restart
and kill the processes and they will pick up where they left off, copying only
new data (as defined by the
Each incremental query mode tracks a set of fields for each point, which it uses
to keep track of which records have been processed and which records are new.
mode setting controls this behavior and supports the following options:
- Timestamp: In this mode, a single record containing a timestamp is used to track the last time data was processed and to query only for records that have been added since that time.
- Custom Query: The source connector supports using custom queries instead of copying whole
measurements. With a custom query, one of the other automatic update modes can be used as long
as the necessary
WHEREclause can be correctly appended to the query. Alternatively, the specified query may handle filtering to new updates itself; however, note that no offset tracking will be performed (unlike the automatic modes where
timestampcolumn values are recorded for each record), so the query must track offsets itself.
- Bulk: This mode is unfiltered and therefore not incremental at all. It will load all records from a measurement on each iteration. This can be useful if you want to periodically dump an entire measurement where entries are eventually deleted and the downstream system can safely handle duplicates.
The source connector gives you quite a bit of flexibility regarding where you can import data from and how that data is imported. The full set of configuration options are listed in Configuration Properties, but here are a few template configurations that cover some common usage scenarios.
Use a whitelist to limit changes to a subset of measurements in an Influx
timestamp fields that are standard on all whitelisted
measurements to detect records that have been created. This mode is the most
robust because it can use timestamps to guarantee modifications are not missed
even if the process dies in the middle of a query.
name=influx-whitelist-timestamp-source connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector tasks.max=10 influxdb.url=http://localhost:8086 influxdb.db=testdb influxdb.measurement.whitelist=users,products,transactions mode=timestamp topic.prefix=influx_
Use a blacklist to exclude measurements from copying data from an Influx database. It will monitor all measurements except blacklisted measurements.
name=influx-blacklist-timestamp-source connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector tasks.max=10 influxdb.url=http://localhost:8086 influxdb.db=testdb influxdb.measurement.blacklist=users,products mode=timestamp topic.prefix=influx_
Use a custom query instead of loading measurements, which allows you to load data from multiple measurements.
name=influx-whitelist-timestamp-source connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector tasks.max=10 influxdb.url=http://localhost:8086 influxdb.db=testdb topic.prefix=influx_ query=Your-Custom-query