InfluxDB Sink Connector for Confluent Platform

The Kafka Connect InfluxDB Sink connector writes data from an Apache Kafka® topic to an InfluxDB host. When more than one record in a batch has the same measurement, time and tags, they are combined and written to InfluxDB.

Features

The InfluxDB Sink connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once from the Kafka topic.

Dead Letter Queue

The connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The InfluxDB Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to huge performance gains when multiple files need to be parsed.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration. Note that license details are the same for both the InfluxDB Source and Sink connectors.

Configuration properties

For a complete list of configuration properties for this connector, see Configuration Reference for InfluxDB Source Connector for Confluent Platform. For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Install the InfluxDB Source connector

The InfluxDB connector is compatible with Confluent Platform version 4.1 and later. You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.
  • If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.

Install the connector using the Confluent CLI

To install the latest connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory and run the following command:

confluent connect plugin install confluentinc/kafka-connect-influxdb:latest
Copy

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent connect plugin install confluentinc/kafka-connect-influxdb:1.2.7
Copy

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Quick start

In this quick start, you copy data from a single Kafka topic to a measurement on a local Influx database running on Docker.

This example assumes you are running Kafka and Schema Registry locally on the default ports. It also assumes you have Docker installed and running. Note that InfluxDB Docker can be replaced with any installed InfluxDB server.

To get started, complete the following steps:

  1. Start the Influx database by running the following Docker command:

    docker run -d -p 8086:8086 --name influxdb-local influxdb:1.7.7
    
    Copy

    This starts the Influx database and maps it to port 8086 on localhost. By default, the username and password are blank. The database connection URL is http://localhost:8086.

  2. Start the Confluent Platform using the following Confluent CLI command:

    confluent local services start
    
    Copy

Property-based example

In this section, you will complete the steps in a property-based example.

  1. Create a configuration file for the connector. This configuration is typically used with standalone workers. Note that this file is included with the connector in ./etc/kafka-connect-influxdb/influxdb-sink-connector.properties and contains the following settings:

            name=InfluxDBSinkConnector
            connector.class=io.confluent.influxdb.InfluxDBSinkConnector
            tasks.max=1
            topics=orders
            influxdb.url=http://localhost:8086
            influxdb.db=influxTestDB
            measurement.name.format=${topic}
            value.converter=io.confluent.connect.avro.AvroConverter
            value.converter.schema.registry.url=http://localhost:8081
    
    The first few settings are common settings you specify for all connectors,
    except for topics which are specific to sink connectors like this one.
    
    The ``influxdb.url`` specifies the connection URL of the influxDB server.
    ``influxdb.db`` specifies the database bame. ``influxdb.username`` specifies
    the username, and ``influxdb.password`` specifies the password of the
    InfluxDB server, respectively. By default the username and password are
    blank for the previous InfluxDB server above, so it is not added in the
    configuration.
    
    Copy
  2. Run the connector with the following configuration:

    confluent local services connect connector load InfluxDBSinkConnector --config etc/kafka-connect-influxdb/influxdb-sink-connector.properties
    
    Copy

REST-based example

In this section, you will complete the steps in a REST-based example.

  1. Copy the following JSON object to influxdb-sink-connector.json and configure all of the required values. This configuration is typically used along with distributed workers.

    {
    "name" : "InfluxDBSinkConnector",
    "config" : {
            "connector.class" : "io.confluent.influxdb.InfluxDBSinkConnector",
            "tasks.max" : "1",
            "topics" : "orders",
            "influxdb.url" : "http://localhost:8086",
            "influxdb.db" : "influxTestDB",
            "measurement.name.format" : "${topic}",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url": "http://localhost:8081"
    }
    }
    
    Copy
  2. Use the following curl command to post the configuration to one of the Kafka Connect workers while changing http://localhost:8083/ to the endpoint of one of your Kafka Connect workers (for more information, see the Kafka Connect REST API):

    curl -X POST -d @influxdb-sink-connector.json http://localhost:8083/connectors -H "Content-Type: application/json"
    
    Copy
  3. Create a record in the orders topic:

    bin/kafka-avro-console-producer \
    --broker-list localhost:9092 --topic orders \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
    "type": "float"}]}'
    
    Copy

    The console producer waits for input.

  4. Copy and paste the following record into the terminal:

    {"id": 999, "product": "foo", "quantity": 100, "price": 50}
    
    Copy
  5. Log in to the Docker container using the following command:

    docker exec -it <containerid> bash
    
    Copy

    To find the container ID, use the docker ps command.

  6. Once you are in the Docker container, log in to InfluxDB shell:

    influx
    
    Copy

    Your output should resemble:

    Connected to http://localhost:8086 version 1.7.7
    InfluxDB shell version: 1.7.7
    
    Copy
  7. Run the following query to verify the records:

    > USE influxTestDB;
    Using database influxTestDB
    
    > SELECT * FROM orders;
    name: orders
    time                id  price product quantity
    ----                --  ----- ------- --------
    1567164248415000000 999 50    foo     100
    
    Copy

Schemaless JSON tags example

In this section, you will complete the steps in a REST-based example.

  1. Configure your connector configuration with the values shown in the following example:

    name=InfluxDBSinkConnector
    connector.class=io.confluent.influxdb.InfluxDBSinkConnector
    tasks.max=1
    topics=test
    influxdb.url=http://localhost:8086
    influxdb.db=influxTestDB
    measurement.name.format=${topic}
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    
    Copy
  2. Create Schemaless JSON tags for a topic named test using the following producer command:

                kafka-console-producer \
                --broker-list localhost:9092 \
                --topic test
    
    The console producer waits for input.
    
    Copy
  3. Copy and paste the following records into the terminal:

    {"name":"influx","age":23,"tags":{"id":"5"}}
    
    Copy

    The following query shows id as a tag in the result. This is based on "payload":{"tags":{"id":"5"} in the producer command.

    > select * from test;
    name: test
    time                age id name
    ----                --- -- ----
    1579307684366000000 23  5  influx
    > show tag keys from test;
    name: test
    tagKey
    ------
    id
    
    Copy

Note that if a record from the Kafka topic contains fields which are not present in the existing InfluxDB measurement, then those fields will be created in the measurement. Additionally, if a record from the Kafka topic does not contain fields which are already present in the existing InfluxDB measurement, then those field values will be empty.

JSON tags example

In this section, you will complete the steps in a JSON tags example.

  1. Configure your connector configuration with the values shown in the following example:

    name=InfluxDBSinkConnector
    connector.class=io.confluent.influxdb.InfluxDBSinkConnector
    tasks.max=1
    topics=test
    influxdb.url=http://localhost:8086
    influxdb.db=influxTestDB
    measurement.name.format=${topic}
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=true
    
    Copy
  2. Create JSON tags for a topic named test using the following producer command:

    kafka-console-producer \
    --broker-list localhost:9092 \
    --topic test
    --property value.schema='{"schema":{"type":"struct","fields":[{"type":"map","keys":{"type":"string","optional":false},"values":{"type":"string","optional":false},"optional":false,"field":"tags"},{"type":"string","optional":false,"field":"time"},{"type":"double","optional":true,"field":"value"}],"optional":false,"version":1},"payload":{"tags":{"id":"5"},"time":"2019-07-24T11:43:19.201040841Z","value":500.0}}'
    
    Copy

    The following query shows id as a tag in the result. This is based on "payload":{"tags":{"id":"5"} in the producer command.

    > select * from test;
    name: test
    time                id value
    ----                -- -----
    1579307684366000000 5  500
    1579307701088000000 5  500
    > show tag keys from test;
    name: test
    tagKey
    ------
    id
    
    Copy

Avro tags example

In this section, you will complete the steps in an Avro tags example.

  1. Configure your connector configuration with the values shown in the following example:

    name=InfluxDBSinkConnector
    connector.class=io.confluent.influxdb.InfluxDBSinkConnector
    tasks.max=1
    topics=products
    influxdb.url=http://localhost:8086
    influxdb.db=influxTestDB
    measurement.name.format=${topic}
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    
    Copy
  2. Create Avro tags for a topic named products using the following producer command:

    kafka-avro-console-producer \
    --broker-list localhost:9092 \
    --topic products \
    --property value.schema='{"name": "myrecord","type": "record","fields": [{"name":"id","type":"int"}, {"name": "product","type": "string"}, {"name": "quantity","type": "int"},{"name": "price","type": "float"}, {"name": "tags","type": {"name": "tags","type": "record","fields": [{"name": "DEVICE","type": "string"},{"name": "location","type": "string"}]}}]}'
    
    Copy

    The console producer waits for input.

  3. Copy and paste the following records into the terminal:

    {"id": 1, "product": "pencil", "quantity": 100, "price": 50, "tags" : {"DEVICE": "living", "location": "home"}}
    
    {"id": 2, "product": "pen", "quantity": 200, "price": 60, "tags" : {"DEVICE": "living", "location": "home"}}
    
    Copy
  4. Verify the data is in InfluxDB.

Topic to database example

If measurement.name.format is not present in the configuration, the connector uses the Kafka topic name as the database name and takes the measurement name from a field in the message.

  1. Configure your connector configuration with the values shown in the following example:

    name=InfluxDBSinkConnector
    connector.class=io.confluent.influxdb.InfluxDBSinkConnector
    tasks.max=1
    topics=products
    influxdb.url=http://localhost:8086
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    
    Copy
  2. Create an Avro record for a topic named products using the following producer command:

    kafka-avro-console-producer \
    --broker-list localhost:9092 \
    --topic products \
    --property value.schema='{"name": "myrecord","type": "record","fields": [{"name":"id","type":"int"}, {"name": "measurement","type":"string"}]}'
    
    Copy

    The console producer waits for input.

  3. Copy and paste the following records into the terminal:

    {"id": 1, "measurement": "test"}
    {"id": 2, "measurement": "test2"}
    
    Copy

    The following query shows the measurements and points written to InfluxDB.

    > use products;
    > show measurements;
    name: measurements
    name
    ----
    test
    test2
    
    > select * from test;
    name: test
    time                id
    ----                --
    1601464614638       1
    
    Copy

Custom timestamp example

In this section, you will complete the steps in a custom timestamp example.

  1. Configure your connector configuration with the values shown in the following example:

    name=InfluxDBSinkConnector
    connector.class=io.confluent.influxdb.InfluxDBSinkConnector
    tasks.max=1
    topics=products
    influxdb.url=http://localhost:8086
    influxdb.db=influxTestDB
    measurement.name.format=${topic}
    event.time.fieldname=time
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    
    Copy
  2. Create an Avro record for a topic named products using the following producer command:

    kafka-avro-console-producer \
    --broker-list localhost:9092 \
    --topic products \
    --property value.schema='{"name": "myrecord","type": "record","fields": [{"name":"id","type":"int"}, {"name": "time","type":"long"}]}'
    
    Copy

    The console producer waits for input. Note that the timestamp needs to be in milliseconds since the Unix Epoch (Unix time).

  3. Copy and paste the following record into the terminal:

    {"id": 1, "time": 123412341234}
    
    Copy

    The following shows the custom timestamp written to InfluxDB.

    > precision ms
    > select * from products;
    name: products
    time                id
    ----                --
    123412341234        1
    
    Copy

Record structure

Each InfluxDB record consists of the following:

  • measurement: Is a required field and must be of type String. However, if the connector’s measurement.name.format and influxdb.db are specified, then measurement is optional; that is, not required in the record.
  • tags: An optional field that must be of type map (or called records in Avro)
  • Value fields you define: All other fields are considered value fields, and can be of type Float, Integer, String, or Boolean. At least one value field is required in the record.
  • Timestamp

The following is an example InfluxDB record:

{
        "measurement": "cpu",
        "tags": {
        "hostname": "test",
        "ip": "10.2.3.4"
        },
        "cpu1": 10,
        "cpu2": 5,
        "cpu3": 15
}
Copy

To learn more, see the InfluxDB documentation.