Datagen Source Connector for Confluent Platform¶
The Kafka Connect Datagen Source connector generates mock source data for development and testing. The data used is pulled from datagen resources. This connector is not suitable for production use.
The following examples show Datagen Source connector version 0.4.0. You should use the latest released version to get the latest features. For the latest released version, refer to the kafka-connect-datagen GitHub repository.
Install the Datagen Source Connector¶
You can install this connector by using the instructions or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-datagen:latest
You can install a specific version by replacing latest
with a version number.
For example:
confluent-hub install confluentinc/kafka-connect-datagen:0.4.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Install the connector into a Connect-based Docker image¶
A Docker image based on Kafka Connect with the kafka-connect-datagen
plugin is available in Dockerhub. If you want to
build a local copy of the Docker image with kafka-connect-datagen
, this
project provides a Dockerfile
that you can reference.
You can create a Docker image packaged with the locally built source by running the following command (which shows the 6.0.1 version of Confluent Platform).
make build-docker-from-local CP_VERSION=6.1.0
Running this command builds the connector from source and creates a local image
with an aggregate version number. The aggregate version number is the
kafka-connect-datagen
connector version number (for example 0.4.0
) and
the Confluent Platform version number (for example, 6.1.0
) separated with a hyphen. The
local kafka-connect-datagen
version number is defined in the pom.xml file.
The Confluent Platform version is defined in the Makefile. An example of the aggregated version
number is 0.4.0-6.1.0
.
Alternatively, you can install the kafka-connect-datagen
connector from
Confluent Hub
into a Docker image by running:
make build-docker-from-released CP_VERSION=6.1.0
The Makefile contains several default variables that affect the version numbers
of both the installed kafka-connect-datagen
connector, as well as the base
Confluent Platform version. The variables are located near the top of the Makefile with the
following names and current default values:
CP_VERSION ?= 6.1.0
KAFKA_CONNECT_DATAGEN_VERSION ?= 0.4.0
These values can be overridden with variable declarations before the make command. For example:
KAFKA_CONNECT_DATAGEN_VERSION=0.4.0 make build-docker-from-released
Run the connector in Docker Compose¶
Here is an example of how to run the kafka-connect-datagen
connector with
the provided docker-compose.yml
file. If you want to use a different Docker
image tag, be sure to modify appropriately in the docker-compose.yml
file.
docker-compose up -d \
--build curl -X POST -H "Content-Type: application/json" \
--data @config/connector_pageviews.config http://localhost:8083/connectors
docker-compose exec connect kafka-console-consumer \
--topic pageviews --bootstrap-server kafka:29092 \
--property print.key=true --max-messages 5 --from-beginning
Configure the Datagen Source Connector¶
The following sections provide information about configuring the connector. For a complete list of configuration properties for this connector, see the Configuration Reference for Datagen Source Connector for Confluent Platform page.
Supported data formats¶
Connect supports Key and Value Converters which can be used to convert record key and value formats when reading from and writing to Kafka. Confluent Platform comes bundled with Avro, JSON Schema, and Protobuf converters. For a list of all supported converters, see Converters.
For an example of using the Protobuf converter with the Datagen connector, see this example configuration. Note the required use of the SetSchemaMetadata Transformation which addresses a compatibility issue between schema names used by the Datagen connector and Protobuf. For additional details, see this compatibility issue.
Using a bundled schema specification¶
There are a few quick start schema specifications bundled with the Datagen
connector. These schemas are listed in this directory.
To use one of these bundled schemas, refer to this mapping.
In the configuration file, set the property quick start
to the associated
name, as shown in the following example:
"quickstart": "users",
Defining a new schema specification¶
You can also define your own schema specification if you want to customize the
fields and their values to be more domain specific, or to match what your
application is expecting. Under the hood, kafka-connect-datagen
uses the
Avro Random Generator, so the only
constraint in writing your own schema specification is that it is compatible
with the generator. To define your own schema:
- Create your own schema file (
your-schema.avsc
) that is compatible with the Avro Random Generator. - In the connector configuration, remove the configuration parameter
quickstart
and add the parametersschema.filename
(which should be the absolute path) andschema.keyfield
as shown below:
"schema.filename": "/path/to/your-schema.avsc",
"schema.keyfield": "<field representing the key>",
Note
The custom schema can be used at runtime. It is not necessary to recompile the connector.
Record keys¶
You can control the keys that the connector publishes with its records using the
schema.keyfield
configuration property. If this property is set, the connector
looks for the field name in the top-level Avro records that it generates. It
then uses the field’s value and schema for the key of the message that it
publishes to Kafka.
Keys can be any type (string, int, record, etc.) and can also be nullable. If no
schema.keyfield
is provided, the key will be null with an optional string
schema.
Mock Data, Avro, and Schema Registry¶
To define the set of rules for the mock data, kafka-connect-datagen
uses
the Avro Random Generator. The configuration
parameters quickstart
or schema.filename
specify the Avro schema (the
set of rules) which declares a list of primitives or more complex data types,
length of data, and other properties about the generated mock data. Examples of
these schema files are listed in this directory.
Important
Do not confuse the use of the terms Avro and schemas when describing mock data generation and data produced to Kafka. The context for these terms is different. The Avro schemas for generating mock data are independent of the format of the data produced to Kafka and the schema in Confluent Schema Registry.
The format of data produced to Kafka may or may not be Avro. To define the format
of the data produced to Kafka, you must set the format type in your connector
configuration. The connector configuration parameters can be defined for the key
or value. For example, to produce messages to Kafka where the message value
format is Avro, set the value.converter
and
value.converter.schema.registry.url
configuration parameters:
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
Or, to produce messages to Kafka where the message value format is JSON, set the value.converter
configuration parameter:
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
The schema in Schema Registry declares the record fields and their types, and is used by Kafka clients when they are configured to produce or consume Avro data. As an example, consider the following example in a schema specification that generates a field userid:
{"name": "userid", "type": {
"type": "string",
"arg.properties": {
"regex": "User_[1-9]{0,1}"
}
}},
If you are using Avro format for producing data to Kafka, here is the corresponding field in the registered schema in Schema Registry:
{"name": "userid", "type": ["null", "string"], "default": null},
If you are not using Avro format for producing data to Kafka, there is no schema in Schema Registry.
Utility Headers¶
The Datagen Source connector captures details about the record’s generation in the headers of the records it produces. The following fields are populated:
Header Key | Header Value |
---|---|
task.generation |
Task generation number (starts at 0 , incremented each time the task restarts) |
task.id |
Task ID number (0 up to tasks.max - 1 ) |
current.iteration |
Record iteration number (starts at 0 , incremented each time a record is generated) |
Additional Information¶
For additional information, see the README doc at the kafka-connect-datagen GitHub repository.