Quick Start using Community Components (Local)

Download Confluent Platform and use this quick start to get up and running using Confluent Platform with Confluent Community components in a development environment. It demonstrates the basic and most powerful capabilities, including creating topics, adding and modifying data, and stream processing by using ksqlDB. In this quick start you will create Kafka topics and streaming queries on these topics by using ksqlDB.

This quick start leverages the Confluent Platform CLI, the Apache Kafka® CLI, and the ksqlDB CLI. For a rich UI-based experience, try out the Confluent Platform quick start with commercial components.

See also

You can also run an automated version of this quick start designed for Confluent Platform local installs.

Important

Java 8 and Java 11 are supported in this version of Confluent Platform (Java 9 and 10 are not supported). For more information, see Java supported versions. You need to separately install the correct version of Java before you start the Confluent Platform installation process.

Prerequisites:
  • Internet connectivity.
  • Ensure you are on an Operating System currently supported by Confluent Platform.

Step 1: Download and Start Confluent Platform

  1. Go to the downloads page and choose Download Confluent Community.

  2. Provide your name and email and select Download, and then choose the desired format .tar.gz or .zip.

  3. Decompress the file. You should have these directories:

    Folder Description
    /bin/ Driver scripts for starting and stopping services
    /etc/ Configuration files
    /lib/ Systemd services
    /logs/ Log files
    /share/ Jars and licenses
    /src/ Source files that require a platform-dependent build
  4. Install the Confluent Hub client. This is used in the next step to install the free and open source kafka-source-datagen connector.

    • Install Confluent Hub on MacOS.
    • Install Confluent Hub on Linux.
  5. Install the Confluent CLI, confluent, using the following script. The <path-to-cli> parameter must be in your PATH (e.g. /usr/local/bin).

    On Microsoft Windows, an appropriate Linux environment may need to be installed in order to have the curl and sh commands available, such as the Windows Subsystem for Linux.

    curl -L --http1.1 https://cnfl.io/cli | sh -s -- -b /<path-to-cli>
    

    For more information, see Installing and Configuring the CLI.

  6. Set the path to your Confluent Platform installation in your shell profile. For example:

    export CONFLUENT_HOME=<path-to-confluent>
    export PATH="${CONFLUENT_HOME}/bin:$PATH"
    
  7. Install the Kafka Connect Datagen source connector using the Confluent Hub client. This connector generates mock data for demonstration purposes and is not suitable for production. Confluent Hub is an online library of pre-packaged and ready-to-install extensions or add-ons for Confluent Platform and Kafka.

    confluent-hub install \
     --no-prompt confluentinc/kafka-connect-datagen:latest
    

    Your output should resemble:

    Running in a "--no-prompt" mode
    ...
    Completed
    
  8. Start Confluent Platform using the Confluent CLI confluent local start command. This command will start all of the Confluent Platform components, including Kafka, ZooKeeper, Schema Registry, HTTP REST Proxy for Kafka, Kafka Connect, and ksqlDB. You can add the Confluent bin directory to your PATH by running: export PATH=<path-to-confluent>/bin:$PATH

    Important

    The confluent local commands are intended for a single-node development environment and are not suitable for a production environment. The data that are produced are transient and are intended to be temporary. For production-ready workflows, see Install and Upgrade Confluent Platform.

    confluent local start
    

    Your output should resemble:

    Starting zookeeper
    zookeeper is [UP]
    Starting kafka
    kafka is [UP]
    Starting schema-registry
    schema-registry is [UP]
    Starting kafka-rest
    kafka-rest is [UP]
    Starting connect
    connect is [UP]
    Starting ksql-server
    ksql-server is [UP]
    

Step 2: Create Kafka Topics

In this step Kafka topics are created in Confluent Platform by using the Kafka CLI.

  1. Run this command to create a topic named users.

    ./bin/kafka-topics --create --bootstrap-server localhost:9092 \
    --replication-factor 1 --partitions 1 --topic users
    

    Your output should resemble:

    Created topic "users".
    
  2. Run this command to create a topic named pageviews.

    ./bin/kafka-topics --create --bootstrap-server localhost:9092   \
    --replication-factor 1 --partitions 1 --topic pageviews
    

    Your output should resemble:

    Created topic "pageviews".
    

Step 3: Install a Kafka Connector and Generate Sample Data

In this step, you use Kafka Connect to run a demo source connector called kafka-connect-datagen that creates sample data for the Kafka topics pageviews and users.

  1. Run one instance of the Kafka Connect Datagen connector to produce Kafka data to the pageviews topic in AVRO format.

    curl -L -H -O 'Accept: application/vnd.github.v3.raw' https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_pageviews_cos.config
    curl -X POST -H "Content-Type: application/json" --data @connector_pageviews_cos.config http://localhost:8083/connectors
    
  2. Run another instance of the Kafka Connect Datagen connector to produce Kafka data to the users topic in AVRO format.

    curl -L -H -O 'Accept: application/vnd.github.v3.raw' https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_users_cos.config
    curl -X POST -H "Content-Type: application/json" --data @connector_users_cos.config http://localhost:8083/connectors
    

Tip

The Kafka Connect Datagen connector was installed manually in Step 1: Download and Start Confluent Platform. If you encounter issues locating the Datagen Connector, refer to the Issue: Cannot locate the Datagen Connector in the Troubleshooting section.

Step 4: Create and Write to a Stream and Table using ksqlDB

In this step, SQL queries are run on the pageviews and users topics that were created in the previous step. The following ksqlDB commands are run from the ksqlDB CLI. Enter these commands in your terminal and press Enter.

Important

  • Confluent Platform must be installed and running.
  • To try out the ksqlDB web interface, see the Confluent Platform quick start with commercial components.
  • All ksqlDB commands must end with a closing semicolon (;).

Create Streams and Tables

  1. Start the ksqlDB CLI in your terminal with this command.

    LOG_DIR=./ksql_logs ./bin/ksql
    

    Important

    By default ksqlDB attempts to store its logs in a directory called logs that is relative to the location of the ksql executable. For example, if ksql is installed at /usr/local/bin/ksql, then it would attempt to store its logs in /usr/local/logs. If you are running ksql from the default Confluent Platform location, $CONFLUENT_HOME/bin, you must override this default behavior by using the LOG_DIR variable.

  2. Create a stream pageviews from the Kafka topic pageviews, specifying the value_format of AVRO.

    CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR) \
    WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
    

    Tip: Enter the SHOW STREAMS; command to view your streams. For example:

     Stream Name      | Kafka Topic      | Format
    -------------------------------------------------
     PAGEVIEWS        | pageviews        | AVRO
    -------------------------------------------------
    
  3. Create a table users with several columns from the Kafka topic users, with the value_format of AVRO.

    CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR,  \
    userid VARCHAR) \
    WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO', KEY = 'userid');
    

    Tip: Enter the SHOW TABLES; query to view your tables.

     Table Name        | Kafka Topic       | Format    | Windowed
    --------------------------------------------------------------
     USERS             | users             | AVRO      | false
    --------------------------------------------------------------
    

Write Queries

These examples write queries using SQL. The following ksqlDB commands are run from the ksqlDB CLI. Enter these commands in your terminal and press Enter.

  1. Add the custom query property earliest for the auto.offset.reset parameter. This instructs ksqlDB queries to read all available topic data from the beginning. This configuration is used for each subsequent query. For more information, see the ksqlDB Configuration Parameter Reference.

    SET 'auto.offset.reset'='earliest';
    

    Your output should resemble:

    Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
    
  2. Create a non-persistent query that returns data from a stream with the results limited to a maximum of three rows.

    SELECT pageid FROM pageviews EMIT CHANGES LIMIT 3;
    

    Your output should resemble:

    Page_45
    Page_38
    Page_11
    LIMIT reached for the partition.
    Query terminated
    
  3. Create a persistent query that filters for female users. The results from this query are written to the Kafka PAGEVIEWS_FEMALE topic. This query enriches the pageviews STREAM by doing a LEFT JOIN with the users TABLE on the user ID, where a condition (gender = 'FEMALE') is met.

    CREATE STREAM pageviews_female AS SELECT users.userid AS userid, pageid, \
    regionid, gender FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid \
    WHERE gender = 'FEMALE';
    

    Your output should resemble:

     Message
    ------------------------------------------------------------------------------------------------------
     Stream PAGEVIEWS_FEMALE created and running. Created by query with query ID: CSAS_PAGEVIEWS_FEMALE_3
    ------------------------------------------------------------------------------------------------------
    
  4. Create a persistent query where a condition (regionid) is met, using LIKE. Results from this query are written to a Kafka topic named pageviews_enriched_r8_r9.

    CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', \
    value_format='AVRO') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
    

    Your output should resemble:

     Message
    ----------------------------------------------------------------------------------------------------------------------
     Stream PAGEVIEWS_FEMALE_LIKE_89 created and running. Created by query with query ID: CSAS_PAGEVIEWS_FEMALE_LIKE_89_4
    ----------------------------------------------------------------------------------------------------------------------
    
  5. Create a persistent query that counts the pageviews for each region and gender combination in a tumbling window of 30 seconds when the count is greater than 1. Because the procedure is grouping and counting, the result is now a table, rather than a stream. Results from this query are written to a Kafka topic called PAGEVIEWS_REGIONS.

    CREATE TABLE pageviews_regions AS SELECT gender, regionid , \
    COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) \
    GROUP BY gender, regionid HAVING COUNT(*) > 1;
    

    Your output should resemble:

     Message
    -------------------------------------------------------------------------------------------------------
     Table PAGEVIEWS_REGIONS created and running. Created by query with query ID: CTAS_PAGEVIEWS_REGIONS_5
    -------------------------------------------------------------------------------------------------------
    

Step 5: Monitor Streaming Data

Now that your streams are running you can monitor them.

  • View the details for your stream or table with the DESCRIBE EXTENDED command. For example, run this command to view the pageviews_female_like_89 stream:

    DESCRIBE EXTENDED pageviews_female_like_89;
    

    Your output should look like this:

    Name                 : PAGEVIEWS_FEMALE_LIKE_89
    Type                 : STREAM
    Key field            :
    Key format           : STRING
    Timestamp field      : Not set - using <ROWTIME>
    Value format         : AVRO
    Kafka topic          : pageviews_enriched_r8_r9 (partitions: 1, replication: 1)
    
     Field    | Type
    --------------------------------------
     ROWTIME  | BIGINT           (system)
     ROWKEY   | VARCHAR(STRING)  (system)
     USERID   | VARCHAR(STRING)
     PAGEID   | VARCHAR(STRING)
     REGIONID | VARCHAR(STRING)
     GENDER   | VARCHAR(STRING)
    --------------------------------------
    
    ...
    
  • List the running queries with the SHOW QUERIES command.

    SHOW QUERIES;
    

    You should see a query named CTAS_PAGEVIEWS_REGIONS.

  • Discover the query execution plan with the EXPLAIN command. For example, run this command to view the query execution plan for CTAS_PAGEVIEWS_REGIONS:

    EXPLAIN CTAS_PAGEVIEWS_REGIONS_5;
    

    Your output should resemble:

    ID                   : CTAS_PAGEVIEWS_REGIONS_5
    SQL                  : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1) AS SELECT
      PAGEVIEWS_FEMALE.GENDER "GENDER",
      PAGEVIEWS_FEMALE.REGIONID "REGIONID",
      COUNT(*) "NUMUSERS"
    FROM PAGEVIEWS_FEMALE PAGEVIEWS_FEMALE
    WINDOW TUMBLING ( SIZE 30 SECONDS )
    GROUP BY PAGEVIEWS_FEMALE.GENDER, PAGEVIEWS_FEMALE.REGIONID
    HAVING (COUNT(*) > 1)
    EMIT CHANGES;
    Status               : RUNNING
    
     Field    | Type
    --------------------------------------
     ROWTIME  | BIGINT           (system)
     ROWKEY   | VARCHAR(STRING)  (system)
     GENDER   | VARCHAR(STRING)
     REGIONID | VARCHAR(STRING)
     NUMUSERS | BIGINT
    --------------------------------------
    ...
    

For more information about SQL syntax, see ksqlDB Syntax Reference.

Step 6: Stop Confluent Platform

When you are done working with the local install, you can stop Confluent Platform.

  1. Stop Confluent Platform using the Confluent CLI confluent local stop command.

    ./bin/confluent local stop
    
  2. Destroy the data in the Confluent Platform instance with the confluent local destroy command.

    ./bin/confluent local destroy
    

You can start the local install of Confluent Platform again with the confluent local start command.

Troubleshooting

If you encountered any issues, review the following resolutions before trying the steps again.

Issue: Cannot locate the Datagen Connector

Resolution: Verify the DataGen Connector is installed and running.

Ensure that the kafka-connect-datagen is installed and running as described in Step 1: Download and Start Confluent Platform.

<path-to-confluent>/bin/confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest

Your output should resemble:

Running in a "--no-prompt" mode
...
Completed

Resolution: Check the connect logs for Datagen using the Confluent CLI confluent local log command.

<path-to-confluent>/bin/confluent local log connect | grep -i Datagen

Your output should resemble:

[2019-04-18 14:21:08,840] INFO Loading plugin from: /Users/user.name/Confluent/confluent-version/share/confluent-hub-components/confluentinc-kafka-connect-datagen (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:215)
[2019-04-18 14:21:08,894] INFO Registered loader: PluginClassLoader{pluginLocation=file:/Users/user.name/Confluent/confluent-version/share/confluent-hub-components/confluentinc-kafka-connect-datagen/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:238)
[2019-04-18 14:21:08,894] INFO Added plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:167)
[2019-04-18 14:21:09,882] INFO Added aliases 'DatagenConnector' and 'Datagen' to plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:386)

Resolution: Verify the .jar file for kafka-connect-datagen has been added and is present in the lib subfolder.

ls <path-to-confluent>/share/confluent-hub-components/confluentinc-kafka-connect-datagen/lib/

Your output should resemble:

...
kafka-connect-datagen-0.1.0.jar
...

Resolution: Verify the plugin exists in the connector path.

When you installed the kafka-connect-datagen file from Confluent hub, the installation directory is added to the plugin path of several properties files:

 Adding installation directory to plugin path in the following files:
/Users/user.name/Confluent/confluent-version/etc/kafka/connect-distributed.properties
/Users/user.name/Confluent/confluent-version/etc/kafka/connect-standalone.properties
/Users/user.name/Confluent/confluent-version/etc/schema-registry/connect-avro-distributed.properties
/Users/user.name/Confluent/confluent-version/etc/schema-registry/connect-avro-standalone.properties
...

You can use any of them to check the connector path. This example uses the connect-avro-distributed.properties file.

grep plugin.path <path-to-confluent>/etc/schema-registry/connect-avro-distributed.properties

Your output should resemble:

plugin.path=share/java,/Users/user.name/Confluent/confluent-version/share/confluent-hub-components

Confirm its contents are present:

ls <path-to-confluent>/share/confluent-hub-components/confluentinc-kafka-connect-datagen

Your output should resemble:

assets   doc  lib  manifest.json

Issue: Stream-Stream joins error

An error states Stream-Stream joins must have a WITHIN clause specified. This error can occur if you created streams for both pageviews and users by mistake.

Resolution: Ensure that you created a stream for pageviews, and a table for users in Step 4: Create and Write to a Stream and Table using ksqlDB.

Issue: Unable to successfully complete ksqlDB query steps

Java errors or other severe errors were encountered.

Resolution: Ensure you are on an Operating System currently supported by Confluent Platform.

ksqlDB errors were encountered.

Resolution: Review the help in the ksqlDB CLI for successful command tips and links to more documentation.

ksql> help

Next Steps

Learn more about the components shown in this quick start: