Generate Custom Test Data by Using the ksql-datagen tool

You can use the ksql-datagen command-line tool to generate test data that complies with a custom schema that you define.

To generate test data, create an Apache Avro schema and pass it to ksql-datagen. This generates random data according to the schema you provide.

Also, you can generate data from a few simple, predefined schemas.

Prerequisites:

  • Confluent Platform is installed and running. This installation includes an Apache Kafka® broker, KSQL, Control Center, ZooKeeper, Schema Registry, REST Proxy, and Kafka Connect.
  • If you installed Confluent Platform via TAR or ZIP, navigate to the installation directory. The paths and commands used throughout this tutorial assume that you're in this installation directory.
  • Java: Minimum version 1.8. Install Oracle Java JRE or JDK >= 1.8 on your local machine.

The ksql-datagen tool is installed with Confluent Platform by default.

Note

KSQL Server doesn't need to be running for ksql-datagen to generate records to a topic. The ksql-datagen tool isn't just for KSQL. You can use it to produce data to any Kafka topic that you have write access to.

Usage

Use the following command to generate records from an Avro schema:

<path-to-confluent>/bin/ksql-datagen schema=<path-to-avro-file> format=<record format> topic=<kafka topic name> key=<name of key column> [options ...]

Required Arguments

Name Default Description
schema=<avro schema file>   Path to an Avro schema file. Requires the format, topic, and key options.
format=<record format> json Format of generated records: one of avro, json, or delimited. Case-insensitive.
topic=<kafka topic name>   Name of the topic that receives generated records.
key=<name of key column>   Field to use as the key for generated records.
quickstart=<quickstart preset>   Generate records from a preset schema: orders, users, or pageviews. Case-insensitive. If topic isn't specified, creates a topic named <preset>_kafka_topic_json, for example, users_kafka_topic_json.

Use the following command to generate records from one of the predefined schemas:

<path-to-confluent>/bin/ksql-datagen quickstart=<quickstart preset> [options ...]

Optional Arguments

The following options apply to both the schema and quickstart options.

Name Default Description
bootstrap-server=<kafka-server>:<port> localhost:9092 IP address and port for the Kafka server to connect to.
format=<record format> json Format of generated records: avro, json, or delimited. Case-insensitive. Required by the schema option.
topic=<kafka topic name>   Name of the topic that receives generated records. Required by the schema option.
key=<name of key column>   Field to use as the key for generated records. Required by the schema option.
iterations=<number of records> 1,000,000 The maximum number of records to generate.
maxInterval=<max time between records> 500 Longest time to wait before generating a new record, in milliseconds.
propertiesFile=<path-to-properties-file> <path-to-confluent>/etc/ksql/datagen.properties Path to the ksql-datagen properties file.
schemaRegistryUrl http://localhost:8081 URL of Schema Registry when format is avro.

Records are generated at random intervals, with the longest interval specified by the maxInterval option.

Tip

For usage information, enter ksql-datagen help.

Generate Records From a Predefined Schema

The ksql-datagen tool provides some simple schemas for generating example orders, users, and pageviews data.

Generate Example Order Records With Structured Data

The orders quickstart option produces records that simulate orders, with itemid, price, and location columns. The location column is a STRUCT with city, state, and zipcode fields.

The following command generates example order records to a Kafka topic named orders_topic:

<path-to-confluent>/ksql-datagen quickstart=orders topic=orders_topic

In the KSQL CLI or in Control Center, register a stream on orders_topic:

CREATE STREAM orders_raw (
    itemid VARCHAR,
    price DOUBLE,
    location STRUCT<
        city VARCHAR,
        state VARCHAR,
        zipcode INT>,
    timestamp VARCHAR)
 WITH (
    KAFKA_TOPIC='orders_topic',
    VALUE_FORMAT='JSON');

Inspect the schema of the orders_raw stream by using the DESCRIBE statement:

DESCRIBE orders_raw;

Your output should resemble:

Name                 : ORDERS_RAW
 Field     | Type
----------------------------------------------------------------------------------
 ROWTIME   | BIGINT           (system)
 ROWKEY    | VARCHAR(STRING)  (system)
 ITEMID    | VARCHAR(STRING)
 PRICE     | DOUBLE
 LOCATION  | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE INTEGER>
 TIMESTAMP | VARCHAR(STRING)
----------------------------------------------------------------------------------

For more information, see Query With Structured Data.

Generate Example User Records

The users quickstart option produces records that simulate user data, with registertime, gender, regionid, and userid fields. You can join userid values with the page view records generated by the pageviews quickstart option.

The following command generates example user records:

<path-to-confluent>/bin/ksql-datagen quickstart=users

In this example, no topic name is specified, so ksql-datagen creates a topic named users_kafka_topic_json.

In the KSQL CLI or in Control Center, register a table on users_kafka_topic_json:

CREATE TABLE users_original (
    registertime BIGINT,
    gender VARCHAR,
    regionid VARCHAR,
    userid VARCHAR)
WITH (
    kafka_topic='users_kafka_topic_json',
    value_format='JSON',
    key = 'userid');

Inspect the schema of the users_original table by using the DESCRIBE statement:

DESCRIBE users_original;

Your output should resemble:

Name                 : USERS_ORIGINAL
 Field        | Type
------------------------------------------
 ROWTIME      | BIGINT           (system)
 ROWKEY       | VARCHAR(STRING)  (system)
 REGISTERTIME | BIGINT
 GENDER       | VARCHAR(STRING)
 REGIONID     | VARCHAR(STRING)
 USERID       | VARCHAR(STRING)
------------------------------------------

Generate Example User Records With Complex Data

The users_ quickstart option produces records that simulate user data, with registertime, gender, regionid, userid, interests, and contactInfo fields. The interests field is an ARRAY, and the contactInfo field is a MAP.

You can join userid values with the page view records generated by the pageviews quickstart option.

The following command generates example user records that have complex data:

<path-to-confluent>/bin/ksql-datagen quickstart=users_ topic=users_extended

In the KSQL CLI or in Control Center, register a table on users_extended:

CREATE TABLE users_extended (
    registertime BIGINT,
    gender VARCHAR,
    regionid VARCHAR,
    userid VARCHAR,
    interests ARRAY<STRING>,
    contactInfo MAP<STRING, STRING>)
WITH (
    kafka_topic='users_extended',
    value_format='JSON',
    key = 'userid');

Inspect the schema of the users_extended table by using the DESCRIBE statement:

DESCRIBE users_extended;

Your output should resemble:

Name                 : USERS_EXTENDED
 Field        | Type
---------------------------------------------
 ROWTIME      | BIGINT           (system)
 ROWKEY       | VARCHAR(STRING)  (system)
 REGISTERTIME | BIGINT
 GENDER       | VARCHAR(STRING)
 REGIONID     | VARCHAR(STRING)
 USERID       | VARCHAR(STRING)
 INTERESTS    | ARRAY<VARCHAR(STRING)>
 CONTACTINFO  | MAP<STRING, VARCHAR(STRING)>
---------------------------------------------

For more information, see Query With Arrays and Maps.

Generate Example User Page Views

The pageviews quickstart option produces records that simulate page views, with viewtime, userid, and pageid fields. You can join userid values with the user records generated by the users quickstart option.

The following command generates example pageview records to a Kafka topic named pageviews:

<path-to-confluent>/bin/ksql-datagen quickstart=pageviews topic=pageviews

In the KSQL CLI or in Control Center, register a stream on pageviews:

CREATE STREAM pageviews_original (
    viewtime bigint,
    userid varchar,
    pageid varchar)
WITH (
    kafka_topic='pageviews',
    value_format='DELIMITED');

Inspect the schema of the pageviews_original stream by using the DESCRIBE statement:

DESCRIBE pageviews_original;

Your output should resemble:

Name                 : PAGEVIEWS_ORIGINAL
 Field    | Type
--------------------------------------
 ROWTIME  | BIGINT           (system)
 ROWKEY   | VARCHAR(STRING)  (system)
 VIEWTIME | BIGINT
 USERID   | VARCHAR(STRING)
 PAGEID   | VARCHAR(STRING)
--------------------------------------

Generate Records From an Avro Schema

Define a Custom Schema

In this example, you download a custom Avro schema and generate matching test data. The schema is named impressions.avro, and it represents advertisements delivered to users.

Download impressions.avro and copy it to your home directory. It's used by ksql-datagen when you start generating test data.

curl https://raw.githubusercontent.com/apurvam/streams-prototyping/master/src/main/resources/impressions.avro > impressions.avro

Generate Test Data

When you have a custom schema registered, you can generate test data that's made up of random values that satisfy the schema requirements. In the impressions schema, advertisement identifiers are two-digit random numbers between 10 and 99, as specified by the regular expression ad_[1-9][0-9].

Open a new command shell, and in the <path-to-confluent>/bin directory, start generating test values by using the ksql-datagen command. In this example, the schema file, impressions.avro, is in the root directory.

<path-to-confluent>/bin/ksql-datagen schema=~/impressions.avro format=delimited topic=impressions key=impressionid

After a few startup messages, your output should resemble:

impression_796 --> ([ 1528756317023 | 'impression_796' | 'user_41' | 'ad_29' ])
impression_341 --> ([ 1528756317446 | 'impression_341' | 'user_34' | 'ad_32' ])
impression_419 --> ([ 1528756317869 | 'impression_419' | 'user_58' | 'ad_74' ])
impression_399 --> ([ 1528756318146 | 'impression_399' | 'user_32' | 'ad_78' ])

Consume the Test Data Stream

In the KSQL CLI or in Control Center, register the impressions stream:

CREATE STREAM impressions (viewtime BIGINT, key VARCHAR, userid VARCHAR, adid VARCHAR) WITH (KAFKA_TOPIC='impressions', VALUE_FORMAT='DELIMITED');

Create the impressions2 persistent streaming query:

CREATE STREAM impressions2 as select * from impressions;