Datagen Source Connector for Confluent Platform

The Kafka Connect Datagen Source connector generates mock source data for development and testing. The data used is pulled from datagen resources. This connector is not suitable for production use.

The following examples show Datagen Source connector version 0.4.0. You should use the latest released version to get the latest features. For the latest released version, refer to the kafka-connect-datagen GitHub repository.

Install the Datagen Source Connector

You can install this connector by using the instructions or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-datagen:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-datagen:0.4.0

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Install the connector into a Connect-based Docker image

A Docker image based on Kafka Connect with the kafka-connect-datagen plugin is available in Dockerhub. If you want to build a local copy of the Docker image with kafka-connect-datagen, this project provides a Dockerfile that you can reference.

You can create a Docker image packaged with the locally built source by running the following command (which shows the 6.0.1 version of Confluent Platform).

make build-docker-from-local CP_VERSION=6.1.0

Running this command builds the connector from source and creates a local image with an aggregate version number. The aggregate version number is the kafka-connect-datagen connector version number (for example 0.4.0) and the Confluent Platform version number (for example, 6.1.0) separated with a hyphen. The local kafka-connect-datagen version number is defined in the pom.xml file. The Confluent Platform version is defined in the Makefile. An example of the aggregated version number is 0.4.0-6.1.0.

Alternatively, you can install the kafka-connect-datagen connector from Confluent Hub into a Docker image by running:

make build-docker-from-released CP_VERSION=6.1.0

The Makefile contains several default variables that affect the version numbers of both the installed kafka-connect-datagen connector, as well as the base Confluent Platform version. The variables are located near the top of the Makefile with the following names and current default values:

CP_VERSION ?= 6.1.0

KAFKA_CONNECT_DATAGEN_VERSION ?= 0.4.0

These values can be overridden with variable declarations before the make command. For example:

KAFKA_CONNECT_DATAGEN_VERSION=0.4.0 make build-docker-from-released

Run the connector in Docker Compose

Here is an example of how to run the kafka-connect-datagen connector with the provided docker-compose.yml file. If you want to use a different Docker image tag, be sure to modify appropriately in the docker-compose.yml file.

docker-compose up -d \
--build curl -X POST -H "Content-Type: application/json" \
--data @config/connector_pageviews.config http://localhost:8083/connectors
docker-compose exec connect kafka-console-consumer \
--topic pageviews --bootstrap-server kafka:29092  \
--property print.key=true --max-messages 5 --from-beginning

Configure the Datagen Source Connector

The following sections provide information about configuring the connector. For a complete list of configuration properties for this connector, see the Configuration Reference for Datagen Source Connector for Confluent Platform page.

Supported data formats

Connect supports Key and Value Converters which can be used to convert record key and value formats when reading from and writing to Kafka. Confluent Platform comes bundled with Avro, JSON Schema, and Protobuf converters. For a list of all supported converters, see Converters.

For an example of using the Protobuf converter with the Datagen connector, see this example configuration. Note the required use of the SetSchemaMetadata Transformation which addresses a compatibility issue between schema names used by the Datagen connector and Protobuf. For additional details, see this compatibility issue.

Using a bundled schema specification

There are a few quick start schema specifications bundled with the Datagen connector. These schemas are listed in this directory. To use one of these bundled schemas, refer to this mapping. In the configuration file, set the property quick start to the associated name, as shown in the following example:

"quickstart": "users",

Defining a new schema specification

You can also define your own schema specification if you want to customize the fields and their values to be more domain specific, or to match what your application is expecting. Under the hood, kafka-connect-datagen uses the Avro Random Generator, so the only constraint in writing your own schema specification is that it is compatible with the generator. To define your own schema:

  1. Create your own schema file (your-schema.avsc) that is compatible with the Avro Random Generator.
  2. In the connector configuration, remove the configuration parameter quickstart and add the parameters schema.filename (which should be the absolute path) and schema.keyfield as shown below:
"schema.filename": "/path/to/your-schema.avsc",
"schema.keyfield": "<field representing the key>",

Note

The custom schema can be used at runtime. It is not necessary to recompile the connector.

Record keys

You can control the keys that the connector publishes with its records using the schema.keyfield configuration property. If this property is set, the connector looks for the field name in the top-level Avro records that it generates. It then uses the field’s value and schema for the key of the message that it publishes to Kafka.

Keys can be any type (string, int, record, etc.) and can also be nullable. If no schema.keyfield is provided, the key will be null with an optional string schema.

Mock Data, Avro, and Schema Registry

To define the set of rules for the mock data, kafka-connect-datagen uses the Avro Random Generator. The configuration parameters quickstart or schema.filename specify the Avro schema (the set of rules) which declares a list of primitives or more complex data types, length of data, and other properties about the generated mock data. Examples of these schema files are listed in this directory.

Important

Do not confuse the use of the terms Avro and schemas when describing mock data generation and data produced to Kafka. The context for these terms is different. The Avro schemas for generating mock data are independent of the format of the data produced to Kafka and the schema in Confluent Schema Registry.

The format of data produced to Kafka may or may not be Avro. To define the format of the data produced to Kafka, you must set the format type in your connector configuration. The connector configuration parameters can be defined for the key or value. For example, to produce messages to Kafka where the message value format is Avro, set the value.converter and value.converter.schema.registry.url configuration parameters:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",

Or, to produce messages to Kafka where the message value format is JSON, set the value.converter configuration parameter:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",

The schema in Schema Registry declares the record fields and their types, and is used by Kafka clients when they are configured to produce or consume Avro data. As an example, consider the following example in a schema specification that generates a field userid:

{"name": "userid", "type": {
    "type": "string",
    "arg.properties": {
        "regex": "User_[1-9]{0,1}"
    }
}},

If you are using Avro format for producing data to Kafka, here is the corresponding field in the registered schema in Schema Registry:

{"name": "userid", "type": ["null", "string"], "default": null},

If you are not using Avro format for producing data to Kafka, there is no schema in Schema Registry.

Utility Headers

The Datagen Source connector captures details about the record’s generation in the headers of the records it produces. The following fields are populated:

Header Key Header Value
task.generation Task generation number (starts at 0, incremented each time the task restarts)
task.id Task ID number (0 up to tasks.max - 1)
current.iteration Record iteration number (starts at 0, incremented each time a record is generated)

Additional Information

For additional information, see the README doc at the kafka-connect-datagen GitHub repository.