Datagen Source Connector for Confluent Cloud¶
Note
If you are installing the connector locally for Confluent Platform, see Datagen Source Connector for Confluent Platform.
The Confluent Cloud Datagen Source connector is used to generate mock data for development and testing. The connector supports Avro, JSON Schema, Protobuf, and JSON (schemaless) output formats. The mock source data is provided through GitHub from datagen resources. This connector is not suitable for production use.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.
Limitations¶
Be sure to review the following information.
- For connector limitations, see Datagen Source Connector limitations.
- If you plan to use one or more Single Message Transforms (SMTs), see SMT Limitations.
- If you plan to use Confluent Cloud Schema Registry, see Environment Limitations.
Quick Start¶
Use this quick start to get up and running with the Confluent Cloud Datagen source connector. The quick start provides the basics of selecting the connector and configuring it to use for testing and development. This connector is not suitable for production use.
- Prerequisites
- Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud Platform (GCP).
- The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
- Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON Schema, or Protobuf). See Environment Limitations for additional information.
- Kafka cluster credentials. The following lists the different ways you can provide credentials.
- Enter an existing service account resource ID.
- Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
- Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.
Using the Confluent Cloud Console¶
Step 1: Launch your Confluent Cloud cluster.¶
See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.
Step 2: Add a connector.¶
In the left navigation menu, click Data integration, and then click Connectors. If you already have connectors in your cluster, click + Add connector.
Step 4: Enter the connector details.¶
Note
- Ensure you have all your prerequisites completed.
- An asterisk ( * ) designates a required entry.
At the Add Datagen Source Connector screen, complete the following:
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Click Continue.
Under Choose a template, select one of the quick start schemas that will generate sample data into the Kafka topic. The source schema specifications are listed on GitHub in datagen resources.
Under Output Kafka record value format, select an output message format (data coming from the connector): AVRO, JSON_SR (JSON Schema), PROTOBUF, or JSON. Schema Registry must be enabled to use a Schema Registry-based format (for example, AVRO, JSON_SR, or PROTOBUF). See Environment Limitations for more information.
Show advanced configurations
Max interval between messages (ms): Sets the maximum interval (in milliseconds) between messages. The default value is 1000.
For information about transforms and predicates, see the Single Message Transforms (SMT) documentation for details. See Unsupported transformations for a list of SMTs that are not supported with this connector.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- To change the number of tasks, use the Range Slider to select the desired number of tasks.
- Click Continue.
Verify the connection details by previewing the running configuration.
Once you’ve validated that the properties are configured to your satisfaction, click Launch.
Tip
For information about previewing your connector output, see Connector Data Previews.
The status for the connector should go from Provisioning to Running.
Step 5: Check the Kafka topic.¶
After the connector is running, verify that messages are populating your Kafka topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.
Using the Confluent CLI¶
Complete the following steps to set up and run the connector using the Confluent CLI.
Note
- Make sure you have all your prerequisites completed.
- The example commands use Confluent CLI version 2. For more information see, Confluent CLI v2.
Step 1: List the available connectors.¶
Enter the following command to list available connectors:
confluent connect plugin list
Step 2: Show the required connector configuration properties.¶
Enter the following command to show the required connector properties:
confluent connect plugin describe <connector-catalog-name>
For example:
confluent connect plugin describe DatagenSource
Example output:
Following are the required configs:
connector.class: DatagenSource
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
kafka.topic
output.data.format
quickstart
tasks.max
Step 3: Create the connector configuration file.¶
Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.
{
"name" : "<datagen-connector-name>",
"connector.class": "DatagenSource",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret" : "<my-kafka-api-secret>",
"kafka.topic" : "topic1, topic2",
"output.data.format" : "JSON",
"quickstart" : "PAGEVIEWS",
"tasks.max" : "1"
}
Note the following property definitions:
"name"
: Sets a name for your new connector."connector.class"
: Identifies the connector plugin name.
"kafka.auth.mode"
: Identifies the connector authentication mode you want to use. There are two options:SERVICE_ACCOUNT
orKAFKA_API_KEY
(the default). To use an API key and secret, specify the configuration propertieskafka.api.key
andkafka.api.secret
, as shown in the example configuration (above). To use a service account, specify the Resource ID in the propertykafka.service.account.id=<service-account-resource-ID>
. To list the available service account resource IDs, use the following command:confluent iam service-account list
For example:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"kafka.topic"
: Enter one topic or multiple comma-separated topics."output.data.format"
: Sets the output Kafka record value format (data coming from the connector). Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON (schemaless). You must have Confluent Cloud Schema Registry configured if using a schema-based format (for example, Avro)."quickstart"
: Enter one of the following Quick Start schemas:- CLICKSTREAM
- CLICSTREAM_CODES
- CLICKSTREAM_USERS
- INVENTORY
- ORDERS
- PAGEVIEWS (shown in the example)
- PRODUCT
- RATINGS
- STOCK_TRADES
- USERS
- USERS_ARRAY
To view the sample data and schema specifications, see datagen resources.
Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI.
See Configuration Properties for all property values and definitions.
Step 4: Load the configuration file and create the connector.¶
Enter the following command to load the configuration and start the connector:
confluent connect create --config <file-name>.json
For example:
confluent connect create --config datagen-source-config.json
Example output:
Created connector confluent-datagen-source lcc-ix4dl
Step 5: Check the connector status.¶
Enter the following command to check the connector status:
confluent connect list
Example output:
ID | Name | Status | Type
+-----------+--------------------------+---------+-------+
lcc-ix4dl | confluent-datagen-source | RUNNING | source
Step 6: Check the Kafka topic.¶
After the connector is running, verify that messages are populating your Kafka topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.
Example¶
Follow the steps in the Quick Start for Apache Kafka using Confluent Cloud to stream sample data to Kafka using the Datagen Source connector for Confluent Cloud.
Configuration Properties¶
Use the following configuration properties with this connector.
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
- Importance: high
kafka.api.key
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
- Type: password
- Importance: high
Which topic do you want to send data to?¶
kafka.topic
Identifies the topic name to write the data to.
- Type: string
- Importance: high
Output messages¶
output.data.format
Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF
- Type: string
- Importance: high
json.output.decimal.format
Specify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals:
BASE64 to serialize DECIMAL logical types as base64 encoded binary data and
NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.
- Type: string
- Default: BASE64
- Importance: low
Datagen Details¶
quickstart
Select from built-in quickstart schema specifications. Refer to kafka-connect-datagen on Github for additional information.
- Type: string
- Importance: high
max.interval
Set the maximum interval (in milliseconds) between each message.
- Type: int
- Default: 1000
- Importance: high
Number of tasks for this connector¶
tasks.max
- Type: int
- Valid Values: [1,…]
- Importance: high
Next Steps¶
See also
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.
Suggested Reading¶
Blog post: Creating a Serverless Environment for Testing Your Apache Kafka Applications