InfluxDB Source Connector

The InfluxDB source connector allows you to import data from an InfluxDB host into Kafka topics.

Data is loaded by periodically executing an Influx query and creating an output record for each row in the result set. By default, all measurements in a database are copied, each to its own output topic. The database is monitored for new measurements and adapts automatically. When copying data from a measurement, the connector loads only new records.

Features

The source connector supports copying measurements with a variety of InfluxDB data types, adding measurements from the database dynamically, whitelists and blacklists, varying polling intervals, and other settings. However, the most important features for most users are the settings controlling how data is incrementally copied from the database.

Kafka Connect tracks the latest record it retrieved from each measurement, so it can start in the correct location on the next iteration (or in case of a crash). The source connector uses this functionality to only get updated records from measurements (or from the output of a custom query) on each iteration. Several modes are supported, each of which differs in how modified rows are detected.

Quick Start

In this quick start, you copy data from a single measurement from a local Influx database running on Docker into a Kafka topic.

This example assumes you are running Kafka and Schema Registry locally on the default ports. It also assumes your have Docker installed and running.

First, bring up the Influx database by running the following Docker command:

docker run -d -p 8086:8086 --name influxdb-local influxdb:1.7.7

This starts the Influx database, and maps it to port 8086 on localhost. By default, the user name and password are blank. The database connection URL is http://localhost:8086.

To create sample data in the Influx database, log in to the Docker container using the following command:

docker exec -it <containerid> bash

Tip

To find the container ID use the docker ps command.

Once you are in the Docker container, log in to InfluxDB shell:

influx

Your output should resemble:

Connected to http://localhost:8086 version 1.7.7
InfluxDB shell version: 1.7.7

Create Influx Database and Load Data

  1. Create an Influx database with this command:

    > create database testdb;
    

    Check for the new database by running:

    > show databases;
    
  2. In the InfluxDB command prompt, create a measurement and seed it with some data:

    > use testdb;
             Using database testdb
    > INSERT coin,id=1 value=100
    

    Tip

    You can run SELECT * from coin; to verify your measurement data.

Start InfluxDB Source Connector

Start the Confluent Platform using the Confluent CLI command below.

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local start. For more information, see confluent local.

confluent local start

Next, create a configuration file for the connector. This file is included with the connector in ./etc/kafka-connect-influxdb/influxdb-source-connector.properties and contains the following settings:

name=InfluxDBSourceConnector
connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector
tasks.max=1
topic.prefix=influx_
influxdb.url=http://localhost:8086
influxdb.db=testdb
mode=timestamp
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Load the InfluxDB source connector with this configuration.

confluent local load influx-source-connector -- -d etc/kafka-connect-influxdb/influxdb-source-connector.properties

To check that it has copied the data that was present when you started Kafka Connect, start a console consumer, reading from the beginning of the topic:

kafka-console-consumer --bootstrap-server localhost:9092 --topic influx_coin --from-beginning

Your output should resemble:

{"measurement":"coin","tags":{"id":"1"},"time":"2019-07-24T10:14:04.979737851Z","value":100.0}

Note that the default polling interval is five seconds, so it may take a few seconds to show up. Depending on your expected rate of updates or desired latency, a smaller poll interval could be used to deliver updates more quickly.

All the features of Kafka Connect, including offset management and fault tolerance, work with the source connector. You can restart and kill the processes and they will pick up where they left off, copying only new data (as defined by the mode setting).

Query Modes

Each incremental query mode tracks a set of fields for each point, which it uses to keep track of which records have been processed and which records are new. The mode setting controls this behavior and supports the following options:

  • Timestamp: In this mode, a single record containing a timestamp is used to track the last time data was processed and to query only for records that have been added since that time.
  • Custom Query: The source connector supports using custom queries instead of copying whole measurements. With a custom query, one of the other automatic update modes can be used as long as the necessary WHERE clause can be correctly appended to the query. Alternatively, the specified query may handle filtering to new updates itself; however, note that no offset tracking will be performed (unlike the automatic modes where timestamp column values are recorded for each record), so the query must track offsets itself.
  • Bulk: This mode is unfiltered and therefore not incremental at all. It will load all records from a measurement on each iteration. This can be useful if you want to periodically dump an entire measurement where entries are eventually deleted and the downstream system can safely handle duplicates.

Configuration

The source connector gives you quite a bit of flexibility regarding where you can import data from and how that data is imported. The full set of configuration options are listed in Configuration Properties, but here are a few template configurations that cover some common usage scenarios.

Use a whitelist to limit changes to a subset of measurements in an Influx database, using timestamp fields that are standard on all whitelisted measurements to detect records that have been created. This mode is the most robust because it can use timestamps to guarantee modifications are not missed even if the process dies in the middle of a query.

name=influx-whitelist-timestamp-source
connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector
tasks.max=10

influxdb.url=http://localhost:8086
influxdb.db=testdb
influxdb.measurement.whitelist=users,products,transactions
mode=timestamp
topic.prefix=influx_

Use a blacklist to exclude measurements from copying data from an Influx database. It will monitor all measurements except blacklisted measurements.

name=influx-blacklist-timestamp-source
connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector
tasks.max=10

influxdb.url=http://localhost:8086
influxdb.db=testdb
influxdb.measurement.blacklist=users,products
mode=timestamp
topic.prefix=influx_

Use a custom query instead of loading measurements, which allows you to load data from multiple measurements.

name=influx-whitelist-timestamp-source
connector.class=io.confluent.influxdb.source.InfluxdbSourceConnector
tasks.max=10

influxdb.url=http://localhost:8086
influxdb.db=testdb
topic.prefix=influx_
query=Your-Custom-query

Additional Documentation