InfluxDB Sink Connector for Confluent Platform¶
The Kafka Connect InfluxDB Sink connector writes data from an Apache Kafka® topic to an InfluxDB host. When more than one record in a batch has the same measurement, time and tags, they are combined and written to InfluxDB.
Features¶
The InfluxDB Sink connector includes the following features:
At least once delivery¶
This connector guarantees that records are delivered at least once from the Kafka topic.
Dead Letter Queue¶
The connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The InfluxDB Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to huge performance gains when multiple files need to be parsed.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration. Note that license details are the same for both the InfluxDB Source and Sink connectors.
Configuration properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for InfluxDB Source Connector for Confluent Platform. For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.
Install the InfluxDB Source connector¶
The InfluxDB connector is compatible with Confluent Platform version 4.1 and later. You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
Install the connector using the Confluent CLI¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent connect plugin install confluentinc/kafka-connect-influxdb:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-influxdb:1.2.7
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick start¶
In this quick start, you copy data from a single Kafka topic to a measurement on a local Influx database running on Docker.
This example assumes you are running Kafka and Schema Registry locally on the default ports. It also assumes you have Docker installed and running. Note that InfluxDB Docker can be replaced with any installed InfluxDB server.
To get started, complete the following steps:
Start the Influx database by running the following Docker command:
docker run -d -p 8086:8086 --name influxdb-local influxdb:1.7.7
This starts the Influx database and maps it to port 8086 on
localhost
. By default, the username and password are blank. The database connection URL ishttp://localhost:8086
.Start the Confluent Platform using the following Confluent CLI command:
confluent local services start
Property-based example¶
In this section, you will complete the steps in a property-based example.
Create a configuration file for the connector. This configuration is typically used with standalone workers. Note that this file is included with the connector in
./etc/kafka-connect-influxdb/influxdb-sink-connector.properties
and contains the following settings:name=InfluxDBSinkConnector connector.class=io.confluent.influxdb.InfluxDBSinkConnector tasks.max=1 topics=orders influxdb.url=http://localhost:8086 influxdb.db=influxTestDB measurement.name.format=${topic} value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 The first few settings are common settings you specify for all connectors, except for topics which are specific to sink connectors like this one. The ``influxdb.url`` specifies the connection URL of the influxDB server. ``influxdb.db`` specifies the database bame. ``influxdb.username`` specifies the username, and ``influxdb.password`` specifies the password of the InfluxDB server, respectively. By default the username and password are blank for the previous InfluxDB server above, so it is not added in the configuration.
Run the connector with the following configuration:
confluent local services connect connector load InfluxDBSinkConnector --config etc/kafka-connect-influxdb/influxdb-sink-connector.properties
REST-based example¶
In this section, you will complete the steps in a REST-based example.
Copy the following JSON object to
influxdb-sink-connector.json
and configure all of the required values. This configuration is typically used along with distributed workers.{ "name" : "InfluxDBSinkConnector", "config" : { "connector.class" : "io.confluent.influxdb.InfluxDBSinkConnector", "tasks.max" : "1", "topics" : "orders", "influxdb.url" : "http://localhost:8086", "influxdb.db" : "influxTestDB", "measurement.name.format" : "${topic}", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081" } }
Use the following
curl
command to post the configuration to one of the Kafka Connect workers while changinghttp://localhost:8083/
to the endpoint of one of your Kafka Connect workers (for more information, see the Kafka Connect REST API):curl -X POST -d @influxdb-sink-connector.json http://localhost:8083/connectors -H "Content-Type: application/json"
Create a record in the
orders
topic:bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price", "type": "float"}]}'
The console producer waits for input.
Copy and paste the following record into the terminal:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
Log in to the Docker container using the following command:
docker exec -it <containerid> bash
To find the container ID, use the
docker ps
command.Once you are in the Docker container, log in to InfluxDB shell:
influx
Your output should resemble:
Connected to http://localhost:8086 version 1.7.7 InfluxDB shell version: 1.7.7
Run the following query to verify the records:
> USE influxTestDB; Using database influxTestDB > SELECT * FROM orders; name: orders time id price product quantity ---- -- ----- ------- -------- 1567164248415000000 999 50 foo 100
Schemaless JSON tags example¶
In this section, you will complete the steps in a REST-based example.
Configure your connector configuration with the values shown in the following example:
name=InfluxDBSinkConnector connector.class=io.confluent.influxdb.InfluxDBSinkConnector tasks.max=1 topics=test influxdb.url=http://localhost:8086 influxdb.db=influxTestDB measurement.name.format=${topic} key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false
Create Schemaless JSON tags for a topic named
test
using the following producer command:kafka-console-producer \ --broker-list localhost:9092 \ --topic test The console producer waits for input.
Copy and paste the following records into the terminal:
{"name":"influx","age":23,"tags":{"id":"5"}}
The following query shows
id
as a tag in the result. This is based on"payload":{"tags":{"id":"5"}
in the producer command.> select * from test; name: test time age id name ---- --- -- ---- 1579307684366000000 23 5 influx > show tag keys from test; name: test tagKey ------ id
Note that if a record from the Kafka topic contains fields which are not present in the existing InfluxDB measurement, then those fields will be created in the measurement. Additionally, if a record from the Kafka topic does not contain fields which are already present in the existing InfluxDB measurement, then those field values will be empty.
JSON tags example¶
In this section, you will complete the steps in a JSON tags example.
Configure your connector configuration with the values shown in the following example:
name=InfluxDBSinkConnector connector.class=io.confluent.influxdb.InfluxDBSinkConnector tasks.max=1 topics=test influxdb.url=http://localhost:8086 influxdb.db=influxTestDB measurement.name.format=${topic} value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=true
Create JSON tags for a topic named
test
using the following producer command:kafka-console-producer \ --broker-list localhost:9092 \ --topic test --property value.schema='{"schema":{"type":"struct","fields":[{"type":"map","keys":{"type":"string","optional":false},"values":{"type":"string","optional":false},"optional":false,"field":"tags"},{"type":"string","optional":false,"field":"time"},{"type":"double","optional":true,"field":"value"}],"optional":false,"version":1},"payload":{"tags":{"id":"5"},"time":"2019-07-24T11:43:19.201040841Z","value":500.0}}'
The following query shows
id
as a tag in the result. This is based on"payload":{"tags":{"id":"5"}
in the producer command.> select * from test; name: test time id value ---- -- ----- 1579307684366000000 5 500 1579307701088000000 5 500 > show tag keys from test; name: test tagKey ------ id
Avro tags example¶
In this section, you will complete the steps in an Avro tags example.
Configure your connector configuration with the values shown in the following example:
name=InfluxDBSinkConnector connector.class=io.confluent.influxdb.InfluxDBSinkConnector tasks.max=1 topics=products influxdb.url=http://localhost:8086 influxdb.db=influxTestDB measurement.name.format=${topic} value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
Create Avro tags for a topic named
products
using the following producer command:kafka-avro-console-producer \ --broker-list localhost:9092 \ --topic products \ --property value.schema='{"name": "myrecord","type": "record","fields": [{"name":"id","type":"int"}, {"name": "product","type": "string"}, {"name": "quantity","type": "int"},{"name": "price","type": "float"}, {"name": "tags","type": {"name": "tags","type": "record","fields": [{"name": "DEVICE","type": "string"},{"name": "location","type": "string"}]}}]}'
The console producer waits for input.
Copy and paste the following records into the terminal:
{"id": 1, "product": "pencil", "quantity": 100, "price": 50, "tags" : {"DEVICE": "living", "location": "home"}} {"id": 2, "product": "pen", "quantity": 200, "price": 60, "tags" : {"DEVICE": "living", "location": "home"}}
Verify the data is in InfluxDB.
Topic to database example¶
If measurement.name.format
is not present in the configuration, the
connector uses the Kafka topic name as the database name and takes the
measurement name from a field in the message.
Configure your connector configuration with the values shown in the following example:
name=InfluxDBSinkConnector connector.class=io.confluent.influxdb.InfluxDBSinkConnector tasks.max=1 topics=products influxdb.url=http://localhost:8086 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
Create an Avro record for a topic named
products
using the following producer command:kafka-avro-console-producer \ --broker-list localhost:9092 \ --topic products \ --property value.schema='{"name": "myrecord","type": "record","fields": [{"name":"id","type":"int"}, {"name": "measurement","type":"string"}]}'
The console producer waits for input.
Copy and paste the following records into the terminal:
{"id": 1, "measurement": "test"} {"id": 2, "measurement": "test2"}
The following query shows the measurements and points written to InfluxDB.
> use products; > show measurements; name: measurements name ---- test test2 > select * from test; name: test time id ---- -- 1601464614638 1
Custom timestamp example¶
In this section, you will complete the steps in a custom timestamp example.
Configure your connector configuration with the values shown in the following example:
name=InfluxDBSinkConnector connector.class=io.confluent.influxdb.InfluxDBSinkConnector tasks.max=1 topics=products influxdb.url=http://localhost:8086 influxdb.db=influxTestDB measurement.name.format=${topic} event.time.fieldname=time value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
Create an Avro record for a topic named
products
using the following producer command:kafka-avro-console-producer \ --broker-list localhost:9092 \ --topic products \ --property value.schema='{"name": "myrecord","type": "record","fields": [{"name":"id","type":"int"}, {"name": "time","type":"long"}]}'
The console producer waits for input. Note that the timestamp needs to be in milliseconds since the Unix Epoch (Unix time).
Copy and paste the following record into the terminal:
{"id": 1, "time": 123412341234}
The following shows the custom timestamp written to InfluxDB.
> precision ms > select * from products; name: products time id ---- -- 123412341234 1
Record structure¶
Each InfluxDB record consists of the following:
measurement
: Is a required field and must be of typeString
. However, if the connector’smeasurement.name.format
andinfluxdb.db
are specified, thenmeasurement
is optional; that is, not required in the record.tags
: An optional field that must be of typemap
(or calledrecords
in Avro)- Value fields you define: All other fields are considered value fields, and can
be of type
Float
,Integer
,String
, orBoolean
. At least one value field is required in the record. - Timestamp
The following is an example InfluxDB record:
{
"measurement": "cpu",
"tags": {
"hostname": "test",
"ip": "10.2.3.4"
},
"cpu1": 10,
"cpu2": 5,
"cpu3": 15
}
To learn more, see the InfluxDB documentation.