Quick Start for Apache Kafka using Confluent Platform Community Components (Docker)

Use this quick start to get up and running with Confluent Platform and Confluent Community components in a development environment using Docker containers.

In this quick start, you create Apache Kafka® topics, use Kafka Connect to generate mock data to those topics, and create ksqlDB streaming queries on those topics.

This quick start leverages the Confluent Platform CLI, the Apache Kafka® CLI, and the ksqlDB CLI. For a rich UI-based experience, try out the Confluent Platform quick start with commercial components.

Prerequisites:
  • Docker
    • Docker version 1.11 or later is installed and running.
    • Docker Compose is installed. Docker Compose is installed by default with Docker for Mac.
    • Docker memory is allocated minimally at 6 GB. When using Docker Desktop for Mac, the default Docker memory allocation is 2 GB. You can change the default allocation to 6 GB in Docker. Navigate to Preferences > Resources > Advanced.
  • Internet connectivity
  • Operating System currently supported by Confluent Platform
  • Networking and Kafka on Docker
    • Configure your hosts and ports to allow both internal and external components to the Docker network to communicate.
  • (Optional) curl.
    • In the steps below, you will download a Docker Compose file. You can download this file any way you like, but the instructions below provide the explicit curl command you can use to download the file.

Step 1: Download and Start Confluent Platform Using Docker

  1. Download or copy the contents of the Confluent Community all-in-one Docker Compose file, for example:

    curl --silent --output docker-compose.yml \
      https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.1.15-post/cp-all-in-one-community/docker-compose.yml
    
  2. Start Confluent Platform specifying the -d option to run in detached mode:

    docker-compose up -d
    

    The above command starts Confluent Platform with separate containers for all Confluent Platform components. Your output should resemble the following:

    Creating network "cp-all-in-one-community_default" with the default driver
    Creating zookeeper ... done
    Creating broker    ... done
    Creating schema-registry ... done
    Creating rest-proxy      ... done
    Creating connect         ... done
    Creating ksql-datagen    ... done
    Creating ksqldb-server   ... done
    Creating ksqldb-cli      ... done
    
  3. Verify that the services are up and running:

    docker-compose ps
    

    You should see the following:

         Name                    Command               State                Ports
    ------------------------------------------------------------------------------------------
    broker            /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp,
                                                               0.0.0.0:9092->9092/tcp
    connect           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp,
                                                               9092/tcp
    ksqldb-cli        ksql http://localhost:8088       Up
    ksql-datagen      bash -c echo Waiting for K ...   Up
    ksqldb-server     /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp
    rest-proxy        /etc/confluent/docker/run        Up      0.0.0.0:8082->8082/tcp
    schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp
    zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp,
                                                               2888/tcp, 3888/tcp
    

    If the state is not Up, rerun the docker-compose up -d command.

Step 2: Create Kafka Topics

In this step, you create Kafka topics using the Kafka CLI.

  1. Create a topic named users:

    docker-compose exec broker kafka-topics \
      --create \
      --bootstrap-server localhost:9092 \
      --replication-factor 1 \
      --partitions 1 \
      --topic users
    
  2. Create a topic named pageviews:

    docker-compose exec broker kafka-topics \
      --create \
      --bootstrap-server localhost:9092 \
      --replication-factor 1 \
      --partitions 1 \
      --topic pageviews
    

Step 3: Install a Kafka Connector and Generate Sample Data

In this step, you use Kafka Connect to run a demo source connector called kafka-connect-datagen that creates sample data for the Kafka topics pageviews and users.

  1. Run the first instance of the Kafka Connect Datagen connector to produce Kafka data to the pageviews topic in AVRO format.

    curl -L -O -H 'Accept: application/vnd.github.v3.raw' \
      https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_pageviews_cos.config
    
    curl -X POST -H 'Content-Type: application/json' \
      --data @connector_pageviews_cos.config \
      http://localhost:8083/connectors
    
  2. Run the second instance of the Kafka Connect Datagen connector to produce Kafka data to the users topic in AVRO format.

    curl -L -O -H 'Accept: application/vnd.github.v3.raw' \
      https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_users_cos.config
    
    curl -X POST -H 'Content-Type: application/json' \
      --data @connector_users_cos.config \
      http://localhost:8083/connectors
    

Tip

The Kafka Connect Datagen connector was installed automatically when you started Docker Compose in Step 1: Download and Start Confluent Platform Using Docker. If you encounter issues with the Datagen Connector, refer to the Issue: Cannot locate the Datagen Connector in the Troubleshooting section.

Step 4: Create and Write to a Stream and Table using ksqlDB

In this step, you create streams, tables, and queries using ksqlDB SQL. For more information about ksqlDB SQL syntax, see ksqlDB Syntax Reference.

Create Streams and Tables

  1. Start the ksqlDB CLI in your terminal with this command.

    docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
    

    Important

    By default ksqlDB attempts to store its logs in a directory called logs that is relative to the location of the ksql executable. For example, if ksql is installed at /usr/local/bin/ksql, then it would attempt to store its logs in /usr/local/logs. If you are running ksql from the default Confluent Platform location, $CONFLUENT_HOME/bin, you must override this default behavior by using the LOG_DIR variable.

  2. Create a stream PAGEVIEWS from the Kafka topic pageviews, specifying the value_format of AVRO:

    CREATE STREAM PAGEVIEWS (VIEWTIME bigint, USERID varchar, PAGEID varchar) WITH
      (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
    
  3. Create a table USERS with several columns from the Kafka topic users, with the value_format of AVRO:

    CREATE TABLE USERS (USERID VARCHAR PRIMARY KEY, REGISTERTIME BIGINT, GENDER VARCHAR, REGIONID VARCHAR) WITH
        (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');
    

Write Queries

In this step, you run ksqlDB SQL queries.

  1. Set the auto.offset.reset` query property to ``earliest.

    This instructs ksqlDB queries to read all available topic data from the beginning. This configuration is used for each subsequent query. For more information, see the ksqlDB Configuration Parameter Reference.

    SET 'auto.offset.reset'='earliest';
    
  2. Create a non-persistent query that returns data from a stream with the results limited to a maximum of three rows:

    SELECT PAGEID FROM PAGEVIEWS EMIT CHANGES LIMIT 3;
    

    Your output should resemble:

    Page_45
    Page_38
    Page_11
    LIMIT reached
    Query terminated
    
  3. Create a persistent query (as a stream) that filters the PAGEVIEWS stream for female users. The results from this query are written to the Kafka PAGEVIEWS_FEMALE topic:

    CREATE STREAM PAGEVIEWS_FEMALE \
      AS SELECT USERS.USERID AS USERID, PAGEID, REGIONID \
      FROM PAGEVIEWS LEFT JOIN USERS ON PAGEVIEWS.USERID = USERS.USERID \
      WHERE GENDER = 'FEMALE'
      EMIT CHANGES;
    
  4. Create a persistent query where REGIONID ends with 8 or 9. Results from this query are written to the Kafka topic named pageviews_enriched_r8_r9 as explicitly specified in the query:

    CREATE STREAM PAGEVIEWS_FEMALE_LIKE_89 \
      WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='AVRO') \
      AS SELECT * FROM PAGEVIEWS_FEMALE \
        WHERE REGIONID LIKE '%_8' OR REGIONID LIKE '%_9'
        EMIT CHANGES;
    
  5. Create a persistent query that counts the PAGEVIEWS for each REGION and GENDER combination in a tumbling window of 30 seconds when the count is greater than 1. Because the procedure is grouping and counting, the result is now a table, rather than a stream. Results from this query are written to a Kafka topic called PAGEVIEWS_REGIONS:

    CREATE TABLE PAGEVIEWS_REGIONS \
      AS SELECT GENDER, REGIONID , COUNT(*) AS NUMBERS \
      FROM PAGEVIEWS LEFT JOIN USERS ON PAGEVIEWS.USERID = USERS.USERID \
      WINDOW TUMBLING (size 30 second) \
      GROUP BY GENDER, REGIONID \
      HAVING COUNT(*) > 1
      EMIT CHANGES;
    

Examine Streams, Tables, and Queries

  • List the streams:

    SHOW STREAMS;
    
  • List the tables:

    SHOW TABLES;
    
  • View the details of a stream or a table:

    DESCRIBE EXTENDED <stream-or-table-name>;
    

    For example, to view the details of the users table:

    DESCRIBE EXTENDED USERS;
    
  • List the running queries:

    SHOW QUERIES;
    
  • Review the query execution plan:

    Get a Query ID from the output of SHOW QUERIES and run EXPLAIN to view the query execution plan for the Query ID:

    EXPLAIN <Query ID>;
    

Step 5: Monitor Streaming Data

Now you can monitor the running queries created as streams or tables.

  • The following query returns the page view information of female users:

    SELECT * FROM PAGEVIEWS_FEMALE EMIT CHANGES;
    
  • The following query returns the page view information of female users in the regions whose regionid ends with 8 or 9:

    SELECT * FROM PAGEVIEWS_FEMALE_LIKE_89 EMIT CHANGES;
    
  • The following query returns the page view counts for each region and gender combination in a tumbling window of 30 seconds.

    SELECT * FROM PAGEVIEWS_REGIONS EMIT CHANGES;
    

Step 6: Stop Confluent Containers and Clean Up

When you are done working with Docker, you can stop and remove Docker containers and images.

  1. Run the following command to stop the Docker containers for Confluent:

    docker-compose stop
    
  2. After stopping the Docker containers, run the following commands to prune the Docker system. Running these commands deletes containers, networks, volumes, and images, freeing up disk space:

    docker system prune -a --volumes --filter "label=io.confluent.docker"
    

For more information, refer to the official Docker documentation.

Next Steps

Learn more about the components shown in this quick start:

Troubleshooting

If you encountered any issues while going through the quickstart workflow, review the following resolutions before trying the steps again.

Issue: Cannot locate the Datagen Connector

Resolution: Run the build command just for connect if the connect container was not built successfully.

docker-compose build --no-cache connect

Your output should resemble:

Building connect
...
Completed
Removing intermediate container cdb0af3550c8
---> 36d00047d29b
Successfully built 36d00047d29b
Successfully tagged confluentinc/kafka-connect-datagen:latest

If the connect container was already built successfully, you will see an output similar to this:

connect uses an image, skipping

Resolution: Check the Connect log for Datagen.

docker-compose logs connect | grep -i Datagen

Your output should resemble:

connect  | [2019-04-17 20:03:26,137] INFO Loading plugin from: /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:26,206] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:26,206] INFO Added plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:28,102] INFO Added aliases 'DatagenConnector' and 'Datagen' to plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)

Resolution: Check the Connect log for a warning and reminder to run the docker-compose up -d command properly.

docker-compose logs connect | grep -i Datagen

Resolution: Verify the .jar file for kafka-connect-datagen has been added and is present in the lib subfolder.

docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/lib/

Your output should resemble:

...
kafka-connect-datagen-0.1.0.jar
...

Resolution: Verify the plugin exists in the connector path.

docker-compose exec connect bash -c 'echo $CONNECT_PLUGIN_PATH'

Your output should resemble:

/usr/share/java,/usr/share/confluent-hub-components

Confirm its contents are present:

docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen

Your output should resemble:

assets   doc  etc  lib  manifest.json

Issue: Stream-Stream joins error

An error states Stream-Stream joins must have a WITHIN clause specified. This error can occur if you created both pageviews and users as streams by mistake.

Resolution: Ensure that you created a stream for pageviews, and a table for users in Step 4: Create and Write to a Stream and Table using ksqlDB.

Issue: Unable to successfully complete ksqlDB query steps

Java errors or other severe errors were encountered.

Resolution: Ensure you are on an Operating System currently supported by Confluent Platform.

Resolution: Ensure that the Docker memory was increased to 8 MB. Go to Docker > Preferences > Advanced. If Docker memory is insufficient, other unpredictable issues could occur.

ksqlDB errors were encountered.

Resolution: Review the help in the ksqlDB CLI for successful command tips and links to more documentation.

ksql> help

Issue: Demo times out, some or all components do not start

You must allocate a minimum of 6 GB of Docker memory resource. The default memory allocation on Docker Desktop for Mac is 2 GB and must be changed. Confluent Platform demos and examples running on Docker may fail to work properly if Docker memory allocation does not meet this minimum requirement.

../_images/quickstart-docker-memory-rqmts.png

Memory settings on Docker preferences for resources