Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
InfluxDB Source Connector for Confluent Platform¶
The InfluxDB source connector allows you to import data from an InfluxDB host into Kafka topics.
Data is loaded by periodically executing an Influx query and creating an output record for each row in the result set. By default, all measurements in a database are copied, each to its own output topic. The database is monitored for new measurements and adapts automatically. When copying data from a measurement, the connector loads only new records.
Features¶
The source connector supports copying measurements with a variety of InfluxDB data types, adding measurements from the database dynamically, whitelists and blacklists, varying polling intervals, and other settings. However, the most important features for most users are the settings controlling how data is incrementally copied from the database.
Kafka Connect tracks the latest record it retrieved from each measurement, so it can start in the correct location on the next iteration (or in case of a crash). The source connector uses this functionality to only get updated records from measurements (or from the output of a custom query) on each iteration. Several modes are supported, each of which differs in how modified rows are detected.
Configuration Properties¶
For a complete list of configuration properties for this connector, see InfluxDB Source Connector Configuration Properties.
Quick Start¶
In this quick start, you copy data from a single measurement from a local Influx database running on Docker into a Kafka topic.
This example assumes you are running Kafka and Schema Registry locally on the default ports. It also assumes your have Docker installed and running.
First, bring up the Influx database by running the following Docker command:
docker run -d -p 8086:8086 --name influxdb-local influxdb:1.7.7
This starts the Influx database, and maps it to port 8086 on localhost
.
By default, the user name and password are blank. The database connection URL is http://localhost:8086
.
To create sample data in the Influx database, log in to the Docker container using the following command:
docker exec -it <containerid> bash
Tip
To find the container ID use the docker ps
command.
Once you are in the Docker container, log in to InfluxDB shell:
influx
Your output should resemble:
Connected to http://localhost:8086 version 1.7.7
InfluxDB shell version: 1.7.7
Create Influx Database and Load Data¶
Create an Influx database with this command:
> create database testdb;
Check for the new database by running:
> show databases;
In the InfluxDB command prompt, create a measurement and seed it with some data:
> use testdb; Using database testdb > INSERT coin,id=1 value=100
Tip
You can run
SELECT * from coin;
to verify your measurement data.
Start InfluxDB Source Connector¶
Start the Confluent Platform using the Confluent CLI command below.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local start
. For more information, see confluent local.
confluent local start
Next, create a configuration file for the connector. This file is included with
the connector in ./etc/kafka-connect-influxdb/influxdb-source-connector.properties
and contains the following settings:
name=InfluxDBSourceConnector
connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector
tasks.max=1
topic.prefix=influx_
influxdb.url=http://localhost:8086
influxdb.db=testdb
mode=timestamp
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
Load the InfluxDB source connector with this configuration.
confluent local load influx-source-connector -- -d etc/kafka-connect-influxdb/influxdb-source-connector.properties
To check that it has copied the data that was present when you started Kafka Connect, start a console consumer, reading from the beginning of the topic:
kafka-console-consumer --bootstrap-server localhost:9092 --topic influx_coin --from-beginning
Your output should resemble:
{"measurement":"coin","tags":{"id":"1"},"time":"2019-07-24T10:14:04.979737851Z","value":100.0}
Note that the default polling interval is five seconds, so it may take a few seconds to show up. Depending on your expected rate of updates or desired latency, a smaller poll interval could be used to deliver updates more quickly.
All the features of Kafka Connect, including offset
management and fault tolerance, work with the source connector. You can restart
and kill the processes and they will pick up where they left off, copying only
new data (as defined by the mode
setting).
Query Modes¶
Each incremental query mode tracks a set of fields for each point, which it uses
to keep track of which records have been processed and which records are new.
The mode
setting controls this behavior and supports the following options:
- Timestamp: In this mode, a single record containing a timestamp is used to track the last time data was processed and to query only for records that have been added since that time.
- Custom Query: The source connector supports using custom queries instead of copying whole
measurements. With a custom query, one of the other automatic update modes can be used as long
as the necessary
WHERE
clause can be correctly appended to the query. Alternatively, the specified query may handle filtering to new updates itself; however, note that no offset tracking will be performed (unlike the automatic modes wheretimestamp
column values are recorded for each record), so the query must track offsets itself. - Bulk: This mode is unfiltered and therefore not incremental at all. It will load all records from a measurement on each iteration. This can be useful if you want to periodically dump an entire measurement where entries are eventually deleted and the downstream system can safely handle duplicates.
Configuration¶
The source connector gives you quite a bit of flexibility regarding where you can import data from and how that data is imported. The full set of configuration options are listed in InfluxDB Source Connector Configuration Properties, but here are a few template configurations that cover some common usage scenarios.
Use a whitelist to limit changes to a subset of measurements in an Influx
database, using timestamp
fields that are standard on all whitelisted
measurements to detect records that have been created. This mode is the most
robust because it can use timestamps to guarantee modifications are not missed
even if the process dies in the middle of a query.
name=influx-whitelist-timestamp-source
connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector
tasks.max=10
influxdb.url=http://localhost:8086
influxdb.db=testdb
influxdb.measurement.whitelist=users,products,transactions
mode=timestamp
topic.prefix=influx_
Use a blacklist to exclude measurements from copying data from an Influx database. It will monitor all measurements except blacklisted measurements.
name=influx-blacklist-timestamp-source
connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector
tasks.max=10
influxdb.url=http://localhost:8086
influxdb.db=testdb
influxdb.measurement.blacklist=users,products
mode=timestamp
topic.prefix=influx_
Use a custom query instead of loading measurements, which allows you to load data from multiple measurements.
name=influx-whitelist-timestamp-source
connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector
tasks.max=10
influxdb.url=http://localhost:8086
influxdb.db=testdb
topic.prefix=influx_
query=Your-Custom-query